/
file_data_source.go
130 lines (118 loc) · 3.4 KB
/
file_data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package tdengine
import (
"bufio"
"strings"
"github.com/cnosdb/tsdb-comparisons/load"
"github.com/cnosdb/tsdb-comparisons/pkg/data"
"github.com/cnosdb/tsdb-comparisons/pkg/data/usecases/common"
"github.com/cnosdb/tsdb-comparisons/pkg/targets"
)
type fileDataSource struct {
scanner *bufio.Scanner
headers *common.GeneratedDataHeaders
}
func newFileDataSource(fileName string) targets.DataSource {
br := load.GetBufferedReader(fileName)
return &fileDataSource{scanner: bufio.NewScanner(br)}
}
func (d *fileDataSource) Headers() *common.GeneratedDataHeaders {
// headers are read from the input file, and should be read first
if d.headers != nil {
return d.headers
}
// First N lines are header, with the first line containing the tags
// and their names, the second through N-1 line containing the column
// names, and last line being blank to separate from the data
var tags string
var cols []string
i := 0
for {
var line string
ok := d.scanner.Scan()
if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF
fatal("ended too soon, no tags or cols read")
return nil
} else if !ok {
fatal("scan error: %v", d.scanner.Err())
return nil
}
if i == 0 {
tags = d.scanner.Text()
tags = strings.TrimSpace(tags)
} else {
line = d.scanner.Text()
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
cols = append(cols, line)
}
i++
}
tagsarr := strings.Split(tags, ",")
if tagsarr[0] != tagsKey {
fatal("input header in wrong format. got '%s', expected 'tags'", tags[0])
}
tagNames, tagTypes := extractTagNamesAndTypes(tagsarr[1:])
fieldKeys := make(map[string][]string)
for _, tableDef := range cols {
columns := strings.Split(tableDef, ",")
tableName := columns[0]
colNames := columns[1:]
fieldKeys[tableName] = colNames
}
d.headers = &common.GeneratedDataHeaders{
TagTypes: tagTypes,
TagKeys: tagNames,
FieldKeys: fieldKeys,
}
return d.headers
}
func (d *fileDataSource) NextItem() data.LoadedPoint {
if d.headers == nil {
fatal("headers not read before starting to decode points")
return data.LoadedPoint{}
}
newPoint := &insertData{}
ok := d.scanner.Scan()
if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF
return data.LoadedPoint{}
} else if !ok {
fatal("scan error: %v", d.scanner.Err())
return data.LoadedPoint{}
}
// The first line is a CSV line of tags with the first element being "tags"
parts := strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line
prefix := parts[0]
if prefix != tagsKey {
fatal("data file in invalid format; got %s expected %s", prefix, tagsKey)
return data.LoadedPoint{}
}
newPoint.tags = parts[1]
// Scan again to get the data line
ok = d.scanner.Scan()
if !ok {
fatal("scan error: %v", d.scanner.Err())
return data.LoadedPoint{}
}
parts = strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line
prefix = parts[0]
newPoint.fields = parts[1]
return data.NewLoadedPoint(&point{
hypertable: prefix,
row: newPoint,
})
}
func extractTagNamesAndTypes(tags []string) ([]string, []string) {
tagNames := make([]string, len(tags))
tagTypes := make([]string, len(tags))
for i, tagWithType := range tags {
tagAndType := strings.Split(tagWithType, " ")
if len(tagAndType) != 2 {
panic("tag header has invalid format")
}
tagNames[i] = tagAndType[0]
tagTypes[i] = tagAndType[1]
}
return tagNames, tagTypes
}