generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
json.go
76 lines (69 loc) · 1.55 KB
/
json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package log
import (
"bufio"
"encoding/json"
"errors"
"io"
"time"
)
var _ Sink = (*jsonSink)(nil)
type jsonEntry struct {
Entry
Time string `json:"time,omitempty"`
Error string `json:"error,omitempty"`
}
func newJSONSink(w io.Writer) *jsonSink {
return &jsonSink{
w: w,
enc: json.NewEncoder(w),
}
}
type jsonSink struct {
w io.Writer
enc *json.Encoder
}
func (j *jsonSink) Log(entry Entry) error {
var errStr string
if entry.Error != nil {
errStr = entry.Error.Error()
}
jentry := jsonEntry{
Time: entry.Time.Format(time.RFC3339Nano),
Error: errStr,
Entry: entry,
}
return j.enc.Encode(jentry)
}
// JSONStreamer reads a stream of JSON log entries from r and logs them to log.
//
// If a line of JSON is invalid an entry is created at the defaultLevel.
func JSONStreamer(r io.Reader, log *Logger, defaultLevel Level) error {
scan := bufio.NewScanner(r)
scan.Buffer(nil, 1024*1024) // 1MB buffer
for scan.Scan() {
var entry jsonEntry
line := scan.Bytes()
err := json.Unmarshal(line, &entry)
if err != nil {
if len(line) > 0 && line[0] == '{' {
log.Warnf("Invalid JSON log entry: %s", err)
log.Warnf("Entry: %s", line)
}
log.Log(Entry{Level: defaultLevel, Time: time.Now(), Message: string(line)})
} else {
if entry.Error != "" {
entry.Entry.Error = errors.New(entry.Error)
}
entry.Entry.Time, err = time.Parse(time.RFC3339Nano, entry.Time)
if err != nil {
entry.Entry.Time = time.Now()
}
log.Log(entry.Entry)
}
}
err := scan.Err()
if errors.Is(err, io.EOF) {
return nil
}
return err
}