Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log viewer #55434

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ var debugMergeLogsOpts = struct {
prefix string
keepRedactable bool
redactInput bool
dbName string
}{
program: regexp.MustCompile("^cockroach.*$"),
file: regexp.MustCompile(log.FilePattern),
Expand All @@ -1169,7 +1170,8 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
return writeLogStream(s, cmd.OutOrStdout(), o.filter, o.prefix, o.keepRedactable)
//return writeLogStream(s, o.filter, o.prefix, o.keepRedactable)
return writeLogStreamToSQL(s, o.prefix, o.dbName)
}

// DebugCmdsForRocksDB lists debug commands that access rocksdb through the engine
Expand Down Expand Up @@ -1282,4 +1284,6 @@ func init() {
"keep the output log file redactable")
f.BoolVar(&debugMergeLogsOpts.redactInput, "redact", debugMergeLogsOpts.redactInput,
"redact the input files to remove sensitive information")
f.StringVar(&debugMergeLogsOpts.dbName, "db", "./crdb_logs.db",
"the name of the sqlite db to export the logs to")
}
202 changes: 202 additions & 0 deletions pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"bytes"
"container/heap"
"context"
"database/sql"
"io"
"os"
"path/filepath"
Expand All @@ -24,6 +25,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -575,3 +578,202 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
_, err = f.Seek(int64(offset), io.SeekStart)
return err
}

// logStreamToDB pops messages off of s and writes them to out prepending
// prefix per message and filtering messages which match filter.
func writeLogStreamToParquet(s logStream, out io.Writer, prefix string) error {
const chanSize = 1 << 16 // 64k
const maxWriteBufSize = 1 << 18 // 256kB

prefixCache := map[*fileInfo][]byte{}
getPrefix := func(fi *fileInfo) ([]byte, error) {
if prefixBuf, ok := prefixCache[fi]; ok {
return prefixBuf, nil
}
prefixCache[fi] = fi.pattern.ExpandString(nil, prefix, fi.path, fi.matches)
return prefixCache[fi], nil
}

type entryInfo struct {
log.Entry
*fileInfo
}
render := func(ei entryInfo, w io.Writer) (err error) {
var prefixBytes []byte
if prefixBytes, err = getPrefix(ei.fileInfo); err != nil {
return err
}
if _, err = w.Write(prefixBytes); err != nil {
return err
}
return ei.Format(w)
}

g, ctx := errgroup.WithContext(context.Background())
entryChan := make(chan entryInfo, chanSize) // read -> bufferWrites
writeChan := make(chan *bytes.Buffer) // bufferWrites -> write
read := func() error {
defer close(entryChan)
for e, ok := s.peek(); ok; e, ok = s.peek() {
select {
case entryChan <- entryInfo{Entry: e, fileInfo: s.fileInfo()}:
case <-ctx.Done():
return nil
}
s.pop()
}
return s.error()
}
bufferWrites := func() error {
defer close(writeChan)
writing, pending := &bytes.Buffer{}, &bytes.Buffer{}
for {
send, recv := writeChan, entryChan
if pending.Len() == 0 {
send = nil
if recv == nil {
return nil
}
} else if pending.Len() > maxWriteBufSize {
recv = nil
}
select {
case ei, open := <-recv:
if !open {
entryChan = nil
break
}
if err := render(ei, pending); err != nil {
return err
}
case send <- pending:
writing.Reset()
pending, writing = writing, pending
case <-ctx.Done():
return nil
}
}
}
write := func() error {
for buf := range writeChan {
if _, err := out.Write(buf.Bytes()); err != nil {
return err
}
}
return nil
}
g.Go(read)
g.Go(bufferWrites)
g.Go(write)
return g.Wait()
}

// writeLogStream pops messages off of s and writes them to out prepending
// prefix per message and filtering messages which match filter.
func writeLogStreamToSQL(s logStream, prefix, dbName string) error {
const chanSize = 1 << 16 // 64k

prefixCache := map[*fileInfo][]byte{}
getPrefix := func(fi *fileInfo) ([]byte, error) {
if prefixBuf, ok := prefixCache[fi]; ok {
return prefixBuf, nil
}
prefixCache[fi] = fi.pattern.ExpandString(nil, prefix, fi.path, fi.matches)
return prefixCache[fi], nil
}

type entryInfo struct {
log.Entry
*fileInfo
}

g, ctx := errgroup.WithContext(context.Background())
entryChan := make(chan entryInfo, chanSize) // read -> bufferWrites
read := func() error {
defer close(entryChan)
for e, ok := s.peek(); ok; e, ok = s.peek() {
select {
case entryChan <- entryInfo{Entry: e, fileInfo: s.fileInfo()}:
case <-ctx.Done():
return nil
}
s.pop()
}
return s.error()
}
write := func() error {
db, err := sql.Open("sqlite3", dbName)
if err != nil {
return err
}
defer db.Close()
tblName := "logs"
if err := createSQLTable(db, tblName); err != nil {
return errors.Wrap(err, "Failed creating sqlite table")
}
stmt, err := db.Prepare("INSERT INTO " + tblName + `
(prefix, time, severity, goroutine, file, line, tags, message)
VALUES (?,?,?,?,?,?,?,?);
`)
if err != nil {
return errors.Wrap(err, "Failed inserting log entry")
}

var tx *sql.Tx
var cnt int
var toCmt bool
for e := range entryChan {
prefixBytes, err := getPrefix(e.fileInfo)
if err != nil {
return err
}
if cnt == 0 {
tx, err = db.Begin()
if err != nil {
return err
}
toCmt = true
}
cnt++
if _, err := tx.Stmt(stmt).Exec(prefixBytes, e.Time, e.Severity, e.Goroutine, e.File, e.Line, e.Tags, e.Message); err != nil {
return err
}
if cnt > 9999 {
cnt = 0
if err := tx.Commit(); err != nil {
return err
}
toCmt = false
}
}
if toCmt {
if err := tx.Commit(); err != nil {
return err
}
}

return nil
}
g.Go(read)
g.Go(write)
return g.Wait()
}

func createSQLTable(db *sql.DB, name string) error {
stmt, err := db.Prepare("CREATE TABLE IF NOT EXISTS " + name + `(
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"prefix" TEXT,
"time" INTEGER,
"severity" INTEGER,
"goroutine" INTEGER,
"file" TEXT,
"line" INTEGER,
"tags" TEXT,
"message" TEXT
);`)
if err != nil {
return err
}
_, err = stmt.Exec() // Execute SQL Statements
return err
}
7 changes: 7 additions & 0 deletions pkg/util/log/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (l *loggingT) formatLogEntryInternal(entry Entry, cp ttycolor.Profile) *buf
entry.Severity = Severity_INFO // for safety.
}

buf.WriteString("\nFile: ")
buf.WriteString(entry.File)
buf.WriteString("\nTags: ")
buf.WriteString(entry.Tags)
buf.WriteString("\nMes: ")
buf.WriteString(entry.Message)
return buf
tmp := buf.tmp[:len(buf.tmp)]
var n int
var prefix []byte
Expand Down