Skip to content

Commit

Permalink
debug: logs to sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
Spas Bojanov committed Oct 12, 2020
1 parent cad8c41 commit 6567b02
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/cli/debug.go
Expand Up @@ -1152,6 +1152,7 @@ var debugMergeLogsOpts = struct {
prefix string
keepRedactable bool
redactInput bool
dbName string
}{
program: regexp.MustCompile("^cockroach.*$"),
file: regexp.MustCompile(log.FilePattern),
Expand All @@ -1169,7 +1170,8 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
return writeLogStream(s, cmd.OutOrStdout(), o.filter, o.prefix, o.keepRedactable)
//return writeLogStream(s, o.filter, o.prefix, o.keepRedactable)
return writeLogStreamToSQL(s, o.prefix, o.dbName)
}

// DebugCmdsForRocksDB lists debug commands that access rocksdb through the engine
Expand Down Expand Up @@ -1282,4 +1284,6 @@ func init() {
"keep the output log file redactable")
f.BoolVar(&debugMergeLogsOpts.redactInput, "redact", debugMergeLogsOpts.redactInput,
"redact the input files to remove sensitive information")
f.StringVar(&debugMergeLogsOpts.dbName, "db", "./crdb_logs.db",
"the name of the sqlite db to export the logs to")
}
202 changes: 202 additions & 0 deletions pkg/cli/debug_merge_logs.go
Expand Up @@ -15,6 +15,7 @@ import (
"bytes"
"container/heap"
"context"
"database/sql"
"io"
"os"
"path/filepath"
Expand All @@ -24,6 +25,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -575,3 +578,202 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
_, err = f.Seek(int64(offset), io.SeekStart)
return err
}

// logStreamToDB pops messages off of s and writes them to out prepending
// prefix per message and filtering messages which match filter.
func writeLogStreamToParquet(s logStream, out io.Writer, prefix string) error {
const chanSize = 1 << 16 // 64k
const maxWriteBufSize = 1 << 18 // 256kB

prefixCache := map[*fileInfo][]byte{}
getPrefix := func(fi *fileInfo) ([]byte, error) {
if prefixBuf, ok := prefixCache[fi]; ok {
return prefixBuf, nil
}
prefixCache[fi] = fi.pattern.ExpandString(nil, prefix, fi.path, fi.matches)
return prefixCache[fi], nil
}

type entryInfo struct {
log.Entry
*fileInfo
}
render := func(ei entryInfo, w io.Writer) (err error) {
var prefixBytes []byte
if prefixBytes, err = getPrefix(ei.fileInfo); err != nil {
return err
}
if _, err = w.Write(prefixBytes); err != nil {
return err
}
return ei.Format(w)
}

g, ctx := errgroup.WithContext(context.Background())
entryChan := make(chan entryInfo, chanSize) // read -> bufferWrites
writeChan := make(chan *bytes.Buffer) // bufferWrites -> write
read := func() error {
defer close(entryChan)
for e, ok := s.peek(); ok; e, ok = s.peek() {
select {
case entryChan <- entryInfo{Entry: e, fileInfo: s.fileInfo()}:
case <-ctx.Done():
return nil
}
s.pop()
}
return s.error()
}
bufferWrites := func() error {
defer close(writeChan)
writing, pending := &bytes.Buffer{}, &bytes.Buffer{}
for {
send, recv := writeChan, entryChan
if pending.Len() == 0 {
send = nil
if recv == nil {
return nil
}
} else if pending.Len() > maxWriteBufSize {
recv = nil
}
select {
case ei, open := <-recv:
if !open {
entryChan = nil
break
}
if err := render(ei, pending); err != nil {
return err
}
case send <- pending:
writing.Reset()
pending, writing = writing, pending
case <-ctx.Done():
return nil
}
}
}
write := func() error {
for buf := range writeChan {
if _, err := out.Write(buf.Bytes()); err != nil {
return err
}
}
return nil
}
g.Go(read)
g.Go(bufferWrites)
g.Go(write)
return g.Wait()
}

// writeLogStream pops messages off of s and writes them to out prepending
// prefix per message and filtering messages which match filter.
func writeLogStreamToSQL(s logStream, prefix, dbName string) error {
const chanSize = 1 << 16 // 64k

prefixCache := map[*fileInfo][]byte{}
getPrefix := func(fi *fileInfo) ([]byte, error) {
if prefixBuf, ok := prefixCache[fi]; ok {
return prefixBuf, nil
}
prefixCache[fi] = fi.pattern.ExpandString(nil, prefix, fi.path, fi.matches)
return prefixCache[fi], nil
}

type entryInfo struct {
log.Entry
*fileInfo
}

g, ctx := errgroup.WithContext(context.Background())
entryChan := make(chan entryInfo, chanSize) // read -> bufferWrites
read := func() error {
defer close(entryChan)
for e, ok := s.peek(); ok; e, ok = s.peek() {
select {
case entryChan <- entryInfo{Entry: e, fileInfo: s.fileInfo()}:
case <-ctx.Done():
return nil
}
s.pop()
}
return s.error()
}
write := func() error {
db, err := sql.Open("sqlite3", dbName)
if err != nil {
return err
}
defer db.Close()
tblName := "logs"
if err := createSQLTable(db, tblName); err != nil {
return errors.Wrap(err, "Failed creating sqlite table")
}
stmt, err := db.Prepare("INSERT INTO " + tblName + `
(prefix, time, severity, goroutine, file, line, tags, message)
VALUES (?,?,?,?,?,?,?,?);
`)
if err != nil {
return errors.Wrap(err, "Failed inserting log entry")
}

var tx *sql.Tx
var cnt int
var toCmt bool
for e := range entryChan {
prefixBytes, err := getPrefix(e.fileInfo)
if err != nil {
return err
}
if cnt == 0 {
tx, err = db.Begin()
if err != nil {
return err
}
toCmt = true
}
cnt++
if _, err := tx.Stmt(stmt).Exec(prefixBytes, e.Time, e.Severity, e.Goroutine, e.File, e.Line, e.Tags, e.Message); err != nil {
return err
}
if cnt > 9999 {
cnt = 0
if err := tx.Commit(); err != nil {
return err
}
toCmt = false
}
}
if toCmt {
if err := tx.Commit(); err != nil {
return err
}
}

return nil
}
g.Go(read)
g.Go(write)
return g.Wait()
}

func createSQLTable(db *sql.DB, name string) error {
stmt, err := db.Prepare("CREATE TABLE IF NOT EXISTS " + name + `(
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"prefix" TEXT,
"time" INTEGER,
"severity" INTEGER,
"goroutine" INTEGER,
"file" TEXT,
"line" INTEGER,
"tags" TEXT,
"message" TEXT
);`)
if err != nil {
return err
}
_, err = stmt.Exec() // Execute SQL Statements
return err
}
7 changes: 7 additions & 0 deletions pkg/util/log/log_entry.go
Expand Up @@ -73,6 +73,13 @@ func (l *loggingT) formatLogEntryInternal(entry Entry, cp ttycolor.Profile) *buf
entry.Severity = Severity_INFO // for safety.
}

buf.WriteString("\nFile: ")
buf.WriteString(entry.File)
buf.WriteString("\nTags: ")
buf.WriteString(entry.Tags)
buf.WriteString("\nMes: ")
buf.WriteString(entry.Message)
return buf
tmp := buf.tmp[:len(buf.tmp)]
var n int
var prefix []byte
Expand Down

0 comments on commit 6567b02

Please sign in to comment.