-
Notifications
You must be signed in to change notification settings - Fork 67
/
putter.go
338 lines (312 loc) · 9.45 KB
/
putter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
package expr
import (
"fmt"
"slices"
"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/zcode"
)
// Putter is an Evaluator that modifies the record stream with computed values.
// Each new value is called a clause and consists of a field name and
// an expression. Each put clause either replaces an existing value in
// the field specified or appends a value as a new field. Appended
// values appear as new fields in the order that the clause appears
// in the put expression.
type Putter struct {
zctx *zed.Context
builder zcode.Builder
clauses []Assignment
rules map[int]map[string]putRule
warned map[string]struct{}
// vals is a slice to avoid re-allocating for every value
vals []zed.Value
// paths is a slice to avoid re-allocating for every path
paths field.List
}
// A putRule describes how a given record type is modified by describing
// which input fields should be replaced with which clause expression and
// which clauses should be appended. The type of each clause expression
// is recorded since a new rule must be created if any of the types change.
// Such changes aren't typically expected but are possible in the expression
// language.
type putRule struct {
typ zed.Type
clauseTypes []zed.Type
step putStep
}
func NewPutter(zctx *zed.Context, clauses []Assignment) *Putter {
return &Putter{
zctx: zctx,
clauses: clauses,
vals: make([]zed.Value, len(clauses)),
rules: make(map[int]map[string]putRule),
warned: make(map[string]struct{}),
}
}
func (p *Putter) eval(ectx Context, this *zed.Value) ([]zed.Value, field.List, error) {
p.vals = p.vals[:0]
p.paths = p.paths[:0]
for _, cl := range p.clauses {
val := *cl.RHS.Eval(ectx, this)
if val.IsQuiet() {
continue
}
p.vals = append(p.vals, val)
path, err := cl.LHS.Eval(ectx, this)
if err != nil {
return nil, nil, err
}
p.paths = append(p.paths, path)
}
return p.vals, p.paths, nil
}
// A putStep is a recursive data structure encoding a series of steps to be
// carried out to construct an output record from an input record and
// a slice of evaluated clauses.
type putStep struct {
op putOp
index int
container bool
record []putStep // for op == record
}
func (p *putStep) append(step putStep) {
p.record = append(p.record, step)
}
type putOp int
const (
putFromInput putOp = iota // copy field from input record
putFromClause // copy field from put assignment
putRecord // recurse into record below us
)
func (p *putStep) build(in zcode.Bytes, b *zcode.Builder, vals []zed.Value) zcode.Bytes {
switch p.op {
case putRecord:
b.Reset()
if err := p.buildRecord(in, b, vals); err != nil {
return nil
}
return b.Bytes()
default:
// top-level op must be a record
panic(fmt.Sprintf("put: unexpected step %v", p.op))
}
}
func (p *putStep) buildRecord(in zcode.Bytes, b *zcode.Builder, vals []zed.Value) error {
ig := newGetter(in)
for _, step := range p.record {
switch step.op {
case putFromInput:
bytes, err := ig.nth(step.index)
if err != nil {
return err
}
b.Append(bytes)
case putFromClause:
b.Append(vals[step.index].Bytes())
case putRecord:
b.BeginContainer()
bytes, err := in, error(nil)
if step.index >= 0 {
bytes, err = ig.nth(step.index)
if err != nil {
return err
}
}
if err := step.buildRecord(bytes, b, vals); err != nil {
return err
}
b.EndContainer()
}
}
return nil
}
// A getter provides random access to values in a zcode container
// using zcode.Iter. It uses a cursor to avoid quadratic re-seeks for
// the common case where values are fetched sequentially.
type getter struct {
cursor int
container zcode.Bytes
it zcode.Iter
}
func newGetter(cont zcode.Bytes) getter {
return getter{
cursor: -1,
container: cont,
it: cont.Iter(),
}
}
func (ig *getter) nth(n int) (zcode.Bytes, error) {
if n < ig.cursor {
ig.it = ig.container.Iter()
}
for !ig.it.Done() {
zv := ig.it.Next()
ig.cursor++
if ig.cursor == n {
return zv, nil
}
}
return nil, fmt.Errorf("getter.nth: array index %d out of bounds", n)
}
func findOverwriteClause(path field.Path, paths field.List) (int, field.Path, bool) {
for i, lpath := range paths {
if path.Equal(lpath) || lpath.HasStrictPrefix(path) {
return i, lpath, true
}
}
return -1, nil, false
}
func (p *Putter) deriveSteps(inType *zed.TypeRecord, vals []zed.Value, paths field.List) (putStep, zed.Type) {
return p.deriveRecordSteps(field.Path{}, inType.Fields, vals, paths)
}
func (p *Putter) deriveRecordSteps(parentPath field.Path, inFields []zed.Field, vals []zed.Value, paths field.List) (putStep, *zed.TypeRecord) {
s := putStep{op: putRecord}
var fields []zed.Field
// First look at all input fields to see which should
// be copied over and which should be overwritten by
// assignments.
for i, f := range inFields {
path := append(parentPath, f.Name)
matchIndex, matchPath, found := findOverwriteClause(path, paths)
switch {
// input not overwritten by assignment: copy input value.
case !found:
s.append(putStep{
op: putFromInput,
container: zed.IsContainerType(f.Type),
index: i,
})
fields = append(fields, f)
// input field overwritten by non-nested assignment: copy assignment value.
case len(path) == len(matchPath):
s.append(putStep{
op: putFromClause,
container: zed.IsContainerType(vals[matchIndex].Type),
index: matchIndex,
})
fields = append(fields, zed.NewField(f.Name, vals[matchIndex].Type))
// input record field overwritten by nested assignment: recurse.
case len(path) < len(matchPath) && zed.IsRecordType(f.Type):
nestedStep, typ := p.deriveRecordSteps(path, zed.TypeRecordOf(f.Type).Fields, vals, paths)
nestedStep.index = i
s.append(nestedStep)
fields = append(fields, zed.NewField(f.Name, typ))
// input non-record field overwritten by nested assignment(s): recurse.
case len(path) < len(matchPath) && !zed.IsRecordType(f.Type):
nestedStep, typ := p.deriveRecordSteps(path, []zed.Field{}, vals, paths)
nestedStep.index = i
s.append(nestedStep)
fields = append(fields, zed.NewField(f.Name, typ))
default:
panic("put: internal error computing record steps")
}
}
appendClause := func(lpath field.Path) bool {
if !lpath.HasPrefix(parentPath) {
return false
}
return !hasField(lpath[len(parentPath)], fields)
}
// Then, look at put assignments to see if there are any new fields to append.
for i, lpath := range paths {
if appendClause(lpath) {
switch {
// Append value at this level
case len(lpath) == len(parentPath)+1:
s.append(putStep{
op: putFromClause,
container: zed.IsContainerType(vals[i].Type),
index: i,
})
fields = append(fields, zed.NewField(lpath[len(parentPath)], vals[i].Type))
// Appended and nest. For example, this would happen with "put b.c=1" applied to a record {"a": 1}.
case len(lpath) > len(parentPath)+1:
path := append(parentPath, lpath[len(parentPath)])
nestedStep, typ := p.deriveRecordSteps(path, []zed.Field{}, vals, paths)
nestedStep.index = -1
fields = append(fields, zed.NewField(lpath[len(parentPath)], typ))
s.append(nestedStep)
}
}
}
typ, err := p.zctx.LookupTypeRecord(fields)
if err != nil {
panic(err)
}
return s, typ
}
func hasField(name string, fields []zed.Field) bool {
return slices.ContainsFunc(fields, func(f zed.Field) bool {
return f.Name == name
})
}
func (p *Putter) lookupRule(inType *zed.TypeRecord, vals []zed.Value, fields field.List) (putRule, error) {
m, ok := p.rules[inType.ID()]
if !ok {
m = make(map[string]putRule)
p.rules[inType.ID()] = m
}
rule, ok := m[fields.String()]
if ok && sameTypes(rule.clauseTypes, vals) {
return rule, nil
}
// first check fields
if err := CheckPutFields(fields); err != nil {
return putRule{}, fmt.Errorf("put: %w", err)
}
step, typ := p.deriveSteps(inType, vals, fields)
var clauseTypes []zed.Type
for _, val := range vals {
clauseTypes = append(clauseTypes, val.Type)
}
rule = putRule{typ, clauseTypes, step}
p.rules[inType.ID()][fields.String()] = rule
return rule, nil
}
func CheckPutFields(fields field.List) error {
for i, f := range fields {
if f.IsEmpty() {
return fmt.Errorf("left-hand side cannot be 'this' (use 'yield' operator)")
}
for _, c := range fields[i+1:] {
if f.Equal(c) {
return fmt.Errorf("multiple assignments to %s", f)
}
if c.HasStrictPrefix(f) {
return fmt.Errorf("conflicting nested assignments to %s and %s", f, c)
}
if f.HasStrictPrefix(c) {
return fmt.Errorf("conflicting nested assignments to %s and %s", c, f)
}
}
}
return nil
}
func sameTypes(types []zed.Type, vals []zed.Value) bool {
return slices.EqualFunc(types, vals, func(typ zed.Type, val zed.Value) bool {
return typ == val.Type
})
}
func (p *Putter) Eval(ectx Context, this *zed.Value) *zed.Value {
recType := zed.TypeRecordOf(this.Type)
if recType == nil {
if this.IsError() {
// propagate errors
return this
}
return ectx.CopyValue(*p.zctx.WrapError("put: not a record", this))
}
vals, paths, err := p.eval(ectx, this)
if err != nil {
return ectx.CopyValue(*p.zctx.WrapError(fmt.Sprintf("put: %s", err), this))
}
if len(vals) == 0 {
return this
}
rule, err := p.lookupRule(recType, vals, paths)
if err != nil {
return ectx.CopyValue(*p.zctx.WrapError(err.Error(), this))
}
bytes := rule.step.build(this.Bytes(), &p.builder, vals)
return ectx.NewValue(rule.typ, bytes)
}