-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
json_parser.go
141 lines (124 loc) · 3.85 KB
/
json_parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package logstorage
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/valyala/fastjson"
)
// JSONParser parses a single JSON log message into Fields.
//
// See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model
//
// Use GetParser() for obtaining the parser.
type JSONParser struct {
// Fields contains the parsed JSON line after Parse() call
//
// The Fields are valid until the next call to ParseLogMessage()
// or until the parser is returned to the pool with PutParser() call.
Fields []Field
// p is used for fast JSON parsing
p fastjson.Parser
// buf is used for holding the backing data for Fields
buf []byte
// prefixBuf is used for holding the current key prefix
// when it is composed from multiple keys.
prefixBuf []byte
}
func (p *JSONParser) reset() {
clear(p.Fields)
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
}
// GetJSONParser returns JSONParser ready to parse JSON lines.
//
// Return the parser to the pool when it is no longer needed by calling PutJSONParser().
func GetJSONParser() *JSONParser {
v := parserPool.Get()
if v == nil {
return &JSONParser{}
}
return v.(*JSONParser)
}
// PutJSONParser returns the parser to the pool.
//
// The parser cannot be used after returning to the pool.
func PutJSONParser(p *JSONParser) {
p.reset()
parserPool.Put(p)
}
var parserPool sync.Pool
// ParseLogMessage parses the given JSON log message msg into p.Fields.
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) ParseLogMessage(msg []byte) error {
p.reset()
msgStr := bytesutil.ToUnsafeString(msg)
v, err := p.p.Parse(msgStr)
if err != nil {
return fmt.Errorf("cannot parse json: %w", err)
}
if t := v.Type(); t != fastjson.TypeObject {
return fmt.Errorf("expecting json dictionary; got %s", t)
}
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)
return nil
}
// RenameField renames field with the oldName to newName in p.Fields
func (p *JSONParser) RenameField(oldName, newName string) {
if oldName == "" {
return
}
fields := p.Fields
for i := range fields {
f := &fields[i]
if f.Name == oldName {
f.Name = newName
return
}
}
}
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) {
o := v.GetObject()
o.Visit(func(k []byte, v *fastjson.Value) {
t := v.Type()
switch t {
case fastjson.TypeNull:
// Skip nulls
case fastjson.TypeObject:
// Flatten nested JSON objects.
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
prefixLen := len(prefixBuf)
prefixBuf = append(prefixBuf, k...)
prefixBuf = append(prefixBuf, '.')
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
prefixBuf = prefixBuf[:prefixLen]
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
// Convert JSON arrays, numbers, true and false values to their string representation
dstBufLen := len(dstBuf)
dstBuf = v.MarshalTo(dstBuf)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
case fastjson.TypeString:
// Decode JSON strings
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, v.GetStringBytes()...)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
default:
logger.Panicf("BUG: unexpected JSON type: %s", t)
}
})
return dst, dstBuf, prefixBuf
}
func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, []byte) {
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, prefixBuf...)
dstBuf = append(dstBuf, k...)
name := dstBuf[dstBufLen:]
dst = append(dst, Field{
Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value),
})
return dst, dstBuf
}