-
Notifications
You must be signed in to change notification settings - Fork 6
/
serializer.go
95 lines (84 loc) · 2.28 KB
/
serializer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package influx
import (
"github.com/cnosdb/tsdb-comparisons/pkg/data"
"github.com/cnosdb/tsdb-comparisons/pkg/data/serialize"
"io"
)
// Serializer writes a Point in a serialized form for MongoDB
type Serializer struct{}
// Serialize writes Point data to the given writer, conforming to the
// InfluxDB wire protocol.
//
// This function writes output that looks like:
// <measurement>,<tag key>=<tag value> <field name>=<field value> <timestamp>\n
//
// For example:
// foo,tag0=bar baz=-1.0 100\n
func (s *Serializer) Serialize(p *data.Point, w io.Writer) (err error) {
buf := make([]byte, 0, 1024)
buf = append(buf, p.MeasurementName()...)
fakeTags := make([]int, 0)
tagKeys := p.TagKeys()
tagValues := p.TagValues()
for i := 0; i < len(tagKeys); i++ {
if tagValues[i] == nil {
continue
}
switch v := tagValues[i].(type) {
case string:
buf = append(buf, ',')
buf = append(buf, tagKeys[i]...)
buf = append(buf, '=')
buf = append(buf, []byte(v)...)
default:
fakeTags = append(fakeTags, i)
}
}
fieldKeys := p.FieldKeys()
if len(fakeTags) > 0 || len(fieldKeys) > 0 {
buf = append(buf, ' ')
}
firstFieldFormatted := false
for i := 0; i < len(fakeTags); i++ {
tagIndex := fakeTags[i]
// don't append a comma before the first field
if firstFieldFormatted {
buf = append(buf, ',')
}
firstFieldFormatted = true
buf = appendField(buf, tagKeys[tagIndex], tagValues[tagIndex])
}
fieldValues := p.FieldValues()
for i := 0; i < len(fieldKeys); i++ {
value := fieldValues[i]
if value == nil {
continue
}
// don't append a comma before the first field
if firstFieldFormatted {
buf = append(buf, ',')
}
firstFieldFormatted = true
buf = appendField(buf, fieldKeys[i], value)
}
// first field wasn't formatted, because all the fields were nil, InfluxDB will reject the insert
if !firstFieldFormatted {
return nil
}
buf = append(buf, ' ')
buf = serialize.FastFormatAppend(p.Timestamp().UTC().UnixNano(), buf)
buf = append(buf, '\n')
_, err = w.Write(buf)
return err
}
func appendField(buf, key []byte, v interface{}) []byte {
buf = append(buf, key...)
buf = append(buf, '=')
buf = serialize.FastFormatAppend(v, buf)
// Influx uses 'i' to indicate integers:
switch v.(type) {
case int, int64:
buf = append(buf, 'i')
}
return buf
}