-
Notifications
You must be signed in to change notification settings - Fork 28
/
chug.go
187 lines (159 loc) · 3.48 KB
/
chug.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package chug
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"code.cloudfoundry.org/lager"
)
type Entry struct {
IsLager bool
Raw []byte
Log LogEntry
}
type LogEntry struct {
Timestamp time.Time
LogLevel lager.LogLevel
Source string
Message string
Session string
Error error
Trace string
Data lager.Data
}
type lagerTime struct {
t time.Time
}
func (t lagerTime) Time() time.Time {
return t.t
}
func toTimestamp(d string) (time.Time, error) {
f, err := strconv.ParseFloat(d, 64)
if err == nil {
return time.Unix(0, int64(f*1e9)), nil
}
return time.Parse(time.RFC3339Nano, d)
}
// temporarily duplicated to make refactoring in small steps possible
type prettyFormat struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
LogLevel lager.LogLevel `json:"log_level"`
Source string `json:"source"`
Message string `json:"message"`
Data lager.Data `json:"data"`
Error error `json:"-"`
}
func Chug(reader io.Reader, out chan<- Entry) {
scanner := bufio.NewReader(reader)
for {
line, err := scanner.ReadBytes('\n')
if line != nil {
out <- entry(bytes.TrimSuffix(line, []byte{'\n'}))
}
if err != nil {
break
}
}
close(out)
}
func entry(raw []byte) (entry Entry) {
copiedBytes := make([]byte, len(raw))
copy(copiedBytes, raw)
entry = Entry{
IsLager: false,
Raw: copiedBytes,
}
rawString := string(raw)
idx := strings.Index(rawString, "{")
if idx == -1 {
return
}
var prettyLog prettyFormat
decoder := json.NewDecoder(strings.NewReader(rawString[idx:]))
err := decoder.Decode(&prettyLog)
if err != nil {
return
}
entry.Log, entry.IsLager = convertPrettyLog(prettyLog)
return
}
func convertPrettyLog(lagerLog prettyFormat) (LogEntry, bool) {
trace, err := traceFromData(lagerLog.Data)
if err != nil {
return LogEntry{}, false
}
session, err := sessionFromData(lagerLog.Data)
if err != nil {
return LogEntry{}, false
}
logLevel := lagerLog.LogLevel
if lagerLog.Level != "" {
logLevel, err = lager.LogLevelFromString(lagerLog.Level)
if err != nil {
return LogEntry{}, false
}
}
var logErr error
if logLevel == lager.ERROR || logLevel == lager.FATAL {
logErr, err = errorFromData(lagerLog.Data)
if err != nil {
return LogEntry{}, false
}
}
timestamp, err := toTimestamp(lagerLog.Timestamp)
if err != nil {
return LogEntry{}, false
}
return LogEntry{
Timestamp: timestamp,
LogLevel: logLevel,
Source: lagerLog.Source,
Message: lagerLog.Message,
Session: session,
Error: logErr,
Trace: trace,
Data: lagerLog.Data,
}, true
}
func traceFromData(data lager.Data) (string, error) {
trace, ok := data["trace"]
if ok {
traceString, ok := trace.(string)
if !ok {
return "", fmt.Errorf("unable to convert trace: %v", trace)
}
delete(data, "trace")
return traceString, nil
}
return "", nil
}
func sessionFromData(data lager.Data) (string, error) {
session, ok := data["session"]
if ok {
sessionString, ok := session.(string)
if !ok {
return "", fmt.Errorf("unable to convert session: %v", session)
}
delete(data, "session")
return sessionString, nil
}
return "", nil
}
func errorFromData(data lager.Data) (error, error) {
err, ok := data["error"]
if ok {
errorString, ok := err.(string)
if !ok {
return nil, fmt.Errorf("unable to convert error: %v", err)
}
delete(data, "error")
return errors.New(errorString), nil
}
return nil, nil
}