-
Notifications
You must be signed in to change notification settings - Fork 2
/
file_creator_def.go
218 lines (187 loc) · 7.51 KB
/
file_creator_def.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package sc
import (
"encoding/json"
"fmt"
"go/ast"
"strings"
"github.com/capillariesio/capillaries/pkg/eval"
)
const (
CreatorFileTypeUnknown int = 0
CreatorFileTypeCsv int = 1
CreatorFileTypeParquet int = 2
)
type ParquetCodecType string
const (
ParquetCodecGzip ParquetCodecType = "gzip"
ParquetCodecSnappy ParquetCodecType = "snappy"
ParquetCodecUncompressed ParquetCodecType = "uncompressed"
)
type WriteCsvColumnSettings struct {
Format string `json:"format"`
Header string `json:"header"`
}
type WriteParquetColumnSettings struct {
ColumnName string `json:"column_name"`
}
type WriteFileColumnDef struct {
RawExpression string `json:"expression"`
Name string `json:"name"` // To be used in Having
Type TableFieldType `json:"type"` // To be checked when checking expressions and to be used in Having
Csv WriteCsvColumnSettings `json:"csv,omitempty"`
Parquet WriteParquetColumnSettings `json:"parquet,omitempty"`
ParsedExpression ast.Expr `json:"-"`
UsedFields FieldRefs `json:"-"`
}
type TopDef struct {
Limit int `json:"limit"`
RawOrder string `json:"order"`
OrderIdxDef IdxDef // Not an index really, we just re-use IdxDef infrastructure
}
type CsvCreatorSettings struct {
Separator string `json:"separator"`
}
type ParquetCreatorSettings struct {
Codec ParquetCodecType `json:"codec"`
}
type FileCreatorDef struct {
UrlTemplate string `json:"url_template"`
RawHaving string `json:"having,omitempty"`
Top TopDef `json:"top,omitempty"`
Csv CsvCreatorSettings `json:"csv,omitempty"`
Parquet ParquetCreatorSettings `json:"parquet,omitempty"`
Columns []WriteFileColumnDef `json:"columns"`
Having ast.Expr `json:"-"`
UsedInHavingFields FieldRefs `json:"-"`
UsedInTargetExpressionsFields FieldRefs `json:"-"`
CreatorFileType int `json:"-"`
}
// 500k is conservative
const MaxFileCreatorTopLimit int = 5000000
func (creatorDef *FileCreatorDef) getFieldRefs() *FieldRefs {
fieldRefs := make(FieldRefs, len(creatorDef.Columns))
for i := 0; i < len(creatorDef.Columns); i++ {
fieldRefs[i] = FieldRef{
TableName: CreatorAlias,
FieldName: creatorDef.Columns[i].Name,
FieldType: creatorDef.Columns[i].Type}
}
return &fieldRefs
}
func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs {
fieldRefMap := map[string]FieldRef{}
for colIdx := 0; colIdx < len(creatorDef.Columns); colIdx++ {
targetColDef := &creatorDef.Columns[colIdx]
for i := 0; i < len((*targetColDef).UsedFields); i++ {
hash := fmt.Sprintf("%s.%s", (*targetColDef).UsedFields[i].TableName, (*targetColDef).UsedFields[i].FieldName)
if _, ok := fieldRefMap[hash]; !ok {
fieldRefMap[hash] = (*targetColDef).UsedFields[i]
}
}
}
// Map to FieldRefs
fieldRefs := make([]FieldRef, len(fieldRefMap))
i := 0
for _, fieldRef := range fieldRefMap {
fieldRefs[i] = fieldRef
i++
}
return fieldRefs
}
func (creatorDef *FileCreatorDef) HasTop() bool {
return len(strings.TrimSpace(creatorDef.Top.RawOrder)) > 0
}
func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error {
if err := json.Unmarshal(rawWriter, creatorDef); err != nil {
return fmt.Errorf("cannot unmarshal file creator: [%s]", err.Error())
}
if len(creatorDef.Columns) > 0 && creatorDef.Columns[0].Parquet.ColumnName != "" {
creatorDef.CreatorFileType = CreatorFileTypeParquet
if creatorDef.Parquet.Codec == "" {
creatorDef.Parquet.Codec = ParquetCodecGzip
}
} else if len(creatorDef.Columns) > 0 && creatorDef.Columns[0].Csv.Header != "" {
creatorDef.CreatorFileType = CreatorFileTypeCsv
if len(creatorDef.Csv.Separator) == 0 {
creatorDef.Csv.Separator = ","
}
} else {
return fmt.Errorf("cannot cannot detect file creator type: parquet should have column_name, csv should have header etc")
}
// Having
var err error
creatorDef.Having, err = ParseRawGolangExpressionStringAndHarvestFieldRefs(creatorDef.RawHaving, &creatorDef.UsedInHavingFields)
if err != nil {
return fmt.Errorf("cannot parse file creator 'having' condition [%s]: [%s]", creatorDef.RawHaving, err.Error())
}
// Columns
for i := 0; i < len(creatorDef.Columns); i++ {
colDef := &creatorDef.Columns[i]
if (*colDef).ParsedExpression, err = ParseRawGolangExpressionStringAndHarvestFieldRefs((*colDef).RawExpression, &(*colDef).UsedFields); err != nil {
return fmt.Errorf("cannot parse column expression [%s]: [%s]", (*colDef).RawExpression, err.Error())
}
if !IsValidFieldType(colDef.Type) {
return fmt.Errorf("invalid column type [%s]", colDef.Type)
}
}
// Top
if creatorDef.HasTop() {
if creatorDef.Top.Limit <= 0 {
creatorDef.Top.Limit = MaxFileCreatorTopLimit
} else if creatorDef.Top.Limit > MaxFileCreatorTopLimit {
return fmt.Errorf("top.limit cannot exceed %d", MaxFileCreatorTopLimit)
}
idxDefMap := IdxDefMap{}
rawIndexes := map[string]string{"top": fmt.Sprintf("non_unique(%s)", creatorDef.Top.RawOrder)}
if err := idxDefMap.parseRawIndexDefMap(rawIndexes, creatorDef.getFieldRefs()); err != nil {
return fmt.Errorf("cannot parse raw index definition(s) for top: %s", err.Error())
}
creatorDef.Top.OrderIdxDef = *idxDefMap["top"]
}
creatorDef.UsedInTargetExpressionsFields = creatorDef.GetFieldRefsUsedInAllTargetFileExpressions()
return nil
}
func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]any, error) {
errors := make([]string, 0, 2)
fileRecord := make([]any, len(creatorDef.Columns))
for colIdx := 0; colIdx < len(creatorDef.Columns); colIdx++ {
eCtx := eval.NewPlainEvalCtxWithVars(eval.AggFuncDisabled, &srcVars)
valVolatile, err := eCtx.Eval(creatorDef.Columns[colIdx].ParsedExpression)
if err != nil {
errors = append(errors, fmt.Sprintf("cannot evaluate expression for column %s: [%s]", creatorDef.Columns[colIdx].Name, err.Error()))
}
if err := CheckValueType(valVolatile, creatorDef.Columns[colIdx].Type); err != nil {
errors = append(errors, fmt.Sprintf("invalid field %s type: [%s]", creatorDef.Columns[colIdx].Name, err.Error()))
}
fileRecord[colIdx] = valVolatile
}
if len(errors) > 0 {
return nil, fmt.Errorf(strings.Join(errors, "; "))
}
return fileRecord, nil
}
func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []any) (bool, error) {
if len(fileRecord) != len(creatorDef.Columns) {
return false, fmt.Errorf("file record length %d does not match file creator column list length %d", len(fileRecord), len(creatorDef.Columns))
}
if creatorDef.Having == nil {
return true, nil
}
vars := eval.VarValuesMap{}
vars[CreatorAlias] = map[string]any{}
for colIdx := 0; colIdx < len(creatorDef.Columns); colIdx++ {
fieldName := creatorDef.Columns[colIdx].Name
fieldValue := fileRecord[colIdx]
vars[CreatorAlias][fieldName] = fieldValue
}
eCtx := eval.NewPlainEvalCtxWithVars(eval.AggFuncDisabled, &vars)
valVolatile, err := eCtx.Eval(creatorDef.Having)
if err != nil {
return false, fmt.Errorf("cannot evaluate 'having' expression: [%s]", err.Error())
}
valBool, ok := valVolatile.(bool)
if !ok {
return false, fmt.Errorf("cannot get bool when evaluating having expression, got %v(%T) instead", valVolatile, valVolatile)
}
return valBool, nil
}