/
rowseries.go
227 lines (211 loc) · 6 KB
/
rowseries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package io
import (
"time"
)
type RowsInterface interface {
SetCandleAttributes(*CandleAttributes)
GetCandleAttributes() *CandleAttributes
GetRow(i int) []byte // Position to the i-th record
GetData() []byte // Pointer to the beginning of the data
GetNumRows() int
GetRowLen() int
SetRowLen(int)
}
type RowSeriesInterface interface {
GetMetadataKey() string // The filesystem metadata key for this data
GetTPrev() time.Time // The first timestamp of data just prior to the first row
}
type Rows struct {
ColumnInterface
RowsInterface
dataShape []DataShape
data []byte
rowLen int // We allow for a rowLen that might differ from the sum of dataShape for alignment, etc
candleAttributes *CandleAttributes // Attributes of the rows, are they discrete (ticks) or continuous (candles)
}
func NewRows(dataShape []DataShape, data []byte) *Rows {
ca := CandleAttributes(0)
return &Rows{dataShape: dataShape, data: data, rowLen: 0, candleAttributes: &ca}
}
func (rows *Rows) GetColumn(colname string) (col interface{}) {
var offset int
for _, ds := range rows.GetDataShapes() {
if ds.Name == colname {
switch ds.Type {
case FLOAT32:
return getFloat32Column(offset, int(rows.GetRowLen()), rows.GetNumRows(), rows.GetData())
case FLOAT64:
return getFloat64Column(offset, int(rows.GetRowLen()), rows.GetNumRows(), rows.GetData())
case INT32:
return getInt32Column(offset, int(rows.GetRowLen()), rows.GetNumRows(), rows.GetData())
case EPOCH, INT64:
return getInt64Column(offset, int(rows.GetRowLen()), rows.GetNumRows(), rows.GetData())
case BOOL:
fallthrough
case BYTE:
return getByteColumn(offset, int(rows.GetRowLen()), rows.GetNumRows(), rows.GetData())
}
} else {
offset += ds.Type.Size()
}
}
return nil
}
func (rows *Rows) GetDataShapes() []DataShape {
return rows.dataShape
}
func (rows *Rows) Len() int {
return rows.GetNumRows()
}
func (rows *Rows) GetTime() []time.Time {
ep := rows.GetColumn("Epoch").([]int64)
ts := make([]time.Time, len(ep))
nsi := rows.GetColumn("Nanoseconds")
if nsi == nil {
for i, secs := range ep {
ts[i] = ToSystemTimezone(time.Unix(secs, 0))
}
} else {
ns := nsi.([]int32)
for i, secs := range ep {
ts[i] = ToSystemTimezone(time.Unix(secs, int64(ns[i])))
}
}
return ts
}
func (rows *Rows) SetCandleAttributes(ca *CandleAttributes) {
rows.candleAttributes = ca
}
func (rows *Rows) GetCandleAttributes() *CandleAttributes {
return rows.candleAttributes
}
func (rows *Rows) SetRowLen(rowLen int) {
/*
Call this to set a custom row length for this group of rows. This is needed when padding for alignment.
*/
if rowLen < rows.GetRowLen() {
rowLen = rows.GetRowLen() // Make sure the requested rowLen is sane
}
rows.rowLen = rowLen
}
func (rows *Rows) GetRowLen() (len int) {
/*
rowLen can be set directly to allow for alignment, etc, or this will set it based on sum of DataShape
*/
if rows.rowLen == 0 {
for _, shape := range rows.dataShape {
rows.rowLen += shape.Type.Size()
}
}
return rows.rowLen
}
func (rows *Rows) GetNumRows() int {
mylen := rows.GetRowLen()
if mylen == 0 || len(rows.data) == 0 {
return 0
}
return len(rows.data) / mylen
}
func (rows *Rows) GetData() []byte {
return rows.data
}
func (rows *Rows) GetRow(i int) []byte {
rowLen := rows.GetRowLen()
start := i * rowLen
end := start + rowLen
return rows.data[start:end]
}
func (rows *Rows) ToColumnSeries() *ColumnSeries {
cs := NewColumnSeries()
cs.AddColumn("Epoch", rows.GetColumn("Epoch").([]int64))
for _, ds := range rows.GetDataShapes() {
if ds.Name == "Epoch" {
continue
}
cs.AddColumn(ds.Name, rows.GetColumn(ds.Name))
}
cs.SetCandleAttributes(rows.GetCandleAttributes())
return cs
}
type RowSeries struct {
RowSeriesInterface
RowsInterface
ColumnInterface
rows *Rows
metadataKey TimeBucketKey
tPrev time.Time
}
func NewRowSeries(key TimeBucketKey, tPrev int64, data []byte, dataShape []DataShape, rowLen int, cat *CandleAttributes,
rowType EnumRecordType) *RowSeries {
/*
We have to add a column named _nanoseconds_ to the datashapes for a variable record type
This is true because the read() function for variable types inserts a 32-bit nanoseconds column
*/
if rowType == VARIABLE {
dataShape = append(dataShape, DataShape{"Nanoseconds", INT32})
}
timePrev := time.Unix(tPrev, 0).UTC()
rows := NewRows(dataShape, data)
rows.SetCandleAttributes(cat)
rows.SetRowLen(rowLen)
return &RowSeries{
metadataKey: key,
tPrev: timePrev,
rows: rows,
}
}
func (rs *RowSeries) GetMetadataKey() TimeBucketKey {
return rs.metadataKey
}
func (rs *RowSeries) GetTPrev() time.Time {
return rs.tPrev
}
func (rs *RowSeries) SetCandleAttributes(ca *CandleAttributes) {
rs.rows.SetCandleAttributes(ca)
}
func (rs *RowSeries) GetCandleAttributes() *CandleAttributes {
return rs.rows.GetCandleAttributes()
}
func (rs *RowSeries) GetRow(i int) []byte {
return rs.rows.GetRow(i)
}
func (rs *RowSeries) GetData() []byte {
return rs.rows.GetData()
}
func (rs *RowSeries) GetNumRows() int {
return rs.rows.GetNumRows()
}
func (rs *RowSeries) GetRowLen() int {
return rs.rows.GetRowLen()
}
func (rs *RowSeries) SetRowLen(rowLen int) {
rs.rows.SetRowLen(rowLen)
}
func (rs *RowSeries) GetColumn(colname string) (col interface{}) {
return rs.rows.GetColumn(colname)
}
func (rs *RowSeries) GetDataShapes() (ds []DataShape) {
return rs.rows.GetDataShapes()
}
func (rs *RowSeries) Len() int {
return rs.GetNumRows()
}
func (rs *RowSeries) GetTime() []time.Time {
return rs.rows.GetTime()
}
func (rs *RowSeries) GetEpoch() (col []int64) {
return getInt64Column(0, int(rs.GetRowLen()), rs.GetNumRows(), rs.GetData())
}
func (rs *RowSeries) ToColumnSeries() (key TimeBucketKey, cs *ColumnSeries) {
key = rs.GetMetadataKey()
cs = NewColumnSeries()
cs.AddColumn("Epoch", rs.GetEpoch())
for _, ds := range rs.rows.GetDataShapes() {
if ds.Name == "Epoch" {
continue
}
cs.AddColumn(ds.Name, rs.GetColumn(ds.Name))
}
cs.SetCandleAttributes(rs.GetCandleAttributes())
return key, cs
}