-
Notifications
You must be signed in to change notification settings - Fork 3
/
marshall_csv.go
203 lines (185 loc) · 5.75 KB
/
marshall_csv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package bitflow
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
var WarnObsoleteBinaryFormat = true
const (
// CsvSeparator is the character separating fields in the marshalled output
// of CsvMarshaller.
CsvSeparator = ','
// CsvNewline is used by CsvMarshaller after outputting the header line and
// each sample.
CsvNewline = '\n'
// CsvDateFormat is the format used by CsvMarshaller to marshall the timestamp
// of samples.
CsvDateFormat = "2006-01-02 15:04:05.999999999"
)
// CsvMarshaller marshals Headers and Samples to a CSV format.
//
// Every header is marshalled as a comma-separated CSV header line.
// The first field is 'time', the second field is 'tags' (if the following samples
// contain tags). After that the header contains a list of all metrics.
//
// Every sample is marshalled to a comma-separated line starting with a textual
// representation of the timestamp (see CsvDateFormat, UTC timezone), then a space-separated
// key-value list for the tags (only if the 'tags' field was included in the header),
// and then all the metric values in the same order as on the preceding header line.
// To follow the semantics of a correct CSV file, every changed header should start
// a new CSV file.
//
// Every CSV line must be terminated by a newline character (including the last line in a file).
//
// CsvMarshaller can deal with multiple header declarations in the same file or
// data stream. A line that begins with the string "time" is assumed to start a new header,
// since samples usually start with a timestamp, which cannot be formatted as "time".
//
// There are no configuration options for CsvMarshaller.
type CsvMarshaller struct {
}
// ShouldCloseAfterFirstSample defines that csv streams can stream without closing
func (CsvMarshaller) ShouldCloseAfterFirstSample() bool {
return false
}
// String implements the Marshaller interface.
func (CsvMarshaller) String() string {
return "CSV"
}
// WriteHeader implements the Marshaller interface by printing a CSV header line.
func (CsvMarshaller) WriteHeader(header *Header, withTags bool, writer io.Writer) error {
w := WriteCascade{Writer: writer}
w.WriteStr(csv_time_col)
if withTags {
w.WriteByte(CsvSeparator)
w.WriteStr(tags_col)
}
for _, name := range header.Fields {
if err := checkHeaderField(name); err != nil {
return err
}
w.WriteByte(CsvSeparator)
w.WriteStr(name)
}
w.WriteStr(string(CsvNewline))
return w.Err
}
// WriteSample implements the Marshaller interface by writing a CSV line.
func (CsvMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error {
w := WriteCascade{Writer: writer}
w.WriteStr(sample.Time.UTC().Format(CsvDateFormat))
if withTags {
tags := sample.TagString()
w.WriteByte(CsvSeparator)
w.WriteStr(tags)
}
for _, value := range sample.Values {
w.WriteByte(CsvSeparator)
w.WriteAny(value)
}
w.WriteStr(string(CsvNewline))
return w.Err
}
func splitCsvLine(line []byte) []string {
return strings.Split(string(line), string(CsvSeparator))
}
// Read implements the Unmarshaller interface by reading CSV line from the input stream.
// Based on the first field, Read decides whether the line represents a header or a Sample.
// In case of a header, the CSV fields are split and parsed to a Header instance.
// In case of a Sample, the data for the line is returned without parsing it.
func (c CsvMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error) {
line, err := readUntil(reader, CsvNewline)
if err == io.EOF {
if len(line) == 0 {
return nil, nil, err
} else {
// Ignore here
}
} else if err != nil {
return nil, nil, err
} else if len(line) == 1 {
return nil, nil, errors.New("Empty CSV line")
} else if len(line) > 0 {
line = line[:len(line)-1] // Strip newline char
}
index := bytes.Index(line, []byte{CsvSeparator})
var firstField string
if index < 0 {
firstField = string(line) // Only one field
} else {
firstField = string(line[:index])
}
switch {
case previousHeader == nil:
if checkErr := checkFirstField(csv_time_col, firstField); checkErr != nil {
return nil, nil, checkErr
}
return c.parseHeader(line), nil, err
case firstField == csv_time_col:
return c.parseHeader(line), nil, err
default:
return nil, line, err
}
}
func (CsvMarshaller) parseHeader(line []byte) *UnmarshalledHeader {
fields := splitCsvLine(line)
if WarnObsoleteBinaryFormat && len(fields) == 1 {
log.Warnln("CSV header contains only time field. This might be the old binary format, " +
"use the 'old_binary_format' tag from the go-bitflow-pipeline repository.")
}
hasTags := len(fields) >= 2 && fields[1] == tags_col
header := &UnmarshalledHeader{
HasTags: hasTags,
}
start := 1
if hasTags {
start++
}
header.Fields = fields[start:]
if len(header.Fields) == 0 {
header.Fields = nil
}
return header
}
// ParseSample implements the Unmarshaller interface by parsing a CSV line.
func (CsvMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error) {
fields := splitCsvLine(data)
var t time.Time
t, err = time.Parse(CsvDateFormat, fields[0])
if err != nil {
return
}
var values []Value
if minValueCapacity > 0 {
values = make([]Value, 0, minValueCapacity)
}
sample = &Sample{
Values: values,
Time: t,
}
start := 1
if header.HasTags {
if len(fields) < 2 {
err = fmt.Errorf("Sample too short: %v", fields)
return
}
if err = sample.ParseTagString(fields[1]); err != nil {
return
}
start++
}
for _, field := range fields[start:] {
var val float64
if val, err = strconv.ParseFloat(field, 64); err != nil {
return
}
sample.Values = append(sample.Values, Value(val))
}
return sample, nil
}