-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
bind.go
342 lines (303 loc) · 10.6 KB
/
bind.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graph
import (
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
// TODO(herohde) 4/21/2017: Bind is where most user mistakes will likely show
// up. We should verify that common mistakes yield reasonable errors.
// Bind returns the inbound, outbound and underlying output types for a Fn,
// when bound to the underlying input types. The complication of bind is
// primarily that UserFns have loose signatures and bind must produce valid
// type information for the execution plan.
//
// For example,
//
// func (t EventTime, k typex.X, v int, emit func(string, typex.X))
//
// or
//
// func (context.Context, k typex.X, v int) (string, typex.X, error)
//
// are UserFns that may take one or two incoming fulltypes: either KV<X,int>
// or X with a singleton side input of type int. For the purpose of the
// shape of data processing, the two forms are equivalent. The non-data types,
// context.Context and error, are not part of the data signature, but in play
// only at runtime.
//
// If either was bound to the input type [KV<string,int>], bind would return:
//
// inbound: [Main: KV<X,int>]
// outbound: [KV<string,X>]
// output: [KV<string,string>]
//
// Note that it propagates the assignment of X to string in the output type.
//
// If either was instead bound to the input fulltypes [float, int], the
// result would be:
//
// inbound: [Main: X, Singleton: int]
// outbound: [KV<string,X>]
// output: [KV<string, float>]
//
// Here, the inbound shape and output types are different from before.
func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) {
addContext := func(err error, fn *funcx.Fn) error {
return errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
}
inbound, kinds, err := findInbound(fn, in...)
if err != nil {
return nil, nil, nil, nil, addContext(err, fn)
}
outbound, err := findOutbound(fn)
if err != nil {
return nil, nil, nil, nil, addContext(err, fn)
}
subst, err := typex.Bind(inbound, in)
if err != nil {
return nil, nil, nil, nil, addContext(err, fn)
}
for k, v := range typedefs {
if substK, exists := subst[k]; exists {
err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK)
return nil, nil, nil, nil, addContext(err, fn)
}
subst[k] = v
}
out, err := typex.Substitute(outbound, subst)
if err != nil {
return nil, nil, nil, nil, addContext(err, fn)
}
return inbound, kinds, outbound, out, nil
}
func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) {
ret := trimIllegal(returnTypes(funcx.SubReturns(fn.Ret, fn.Returns(funcx.RetValue)...)))
params := funcx.SubParams(fn.Param, fn.Params(funcx.FnEmit)...)
var outbound []typex.FullType
// The direct output is the "main" output, if any.
switch len(ret) {
case 0:
break // ok: no direct output.
case 1:
outbound = append(outbound, typex.New(ret[0]))
case 2:
outbound = append(outbound, typex.NewKV(typex.New(ret[0]), typex.New(ret[1])))
default:
return nil, errors.Errorf("too many return values: %v", ret)
}
for _, param := range params {
values, _ := funcx.UnfoldEmit(param.T)
trimmed := trimIllegal(values)
if len(trimmed) == 2 {
outbound = append(outbound, typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1])))
} else {
outbound = append(outbound, typex.New(trimmed[0]))
}
}
return outbound, nil
}
func returnTypes(list []funcx.ReturnParam) []reflect.Type {
var ret []reflect.Type
for _, elm := range list {
ret = append(ret, elm.T)
}
return ret
}
func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputKind, error) {
// log.Printf("Bind inbound: %v %v", fn, in)
addContext := func(err error, p []funcx.FnParam, in any) error {
return errors.WithContextf(err, "binding params %v to input %v", p, in)
}
var inbound []typex.FullType
var kinds []InputKind
params := funcx.SubParams(fn.Param, fn.Params(funcx.FnValue|funcx.FnIter|funcx.FnReIter|funcx.FnMultiMap)...)
index := 0
for _, input := range in {
arity, err := inboundArity(input, index == 0)
if err != nil {
return nil, nil, addContext(err, params, input)
}
if len(params)-index < arity {
return nil, nil, addContext(errors.New("too few params"), params[index:], input)
}
paramsToBind := params[index : index+arity]
elm, kind, err := tryBindInbound(input, paramsToBind, index == 0)
if err != nil {
return nil, nil, addContext(err, paramsToBind, input)
}
inbound = append(inbound, elm)
kinds = append(kinds, kind)
index += arity
}
if index < len(params) {
return nil, nil, addContext(errors.New("too few inputs: forgot an input or to annotate options?"), params, in)
}
if index > len(params) {
return nil, nil, addContext(errors.New("too many inputs"), params, in)
}
return inbound, kinds, nil
}
func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.FullType, InputKind, error) {
kind := Main
var other typex.FullType
switch t.Class() {
case typex.Concrete, typex.Container:
if isMain {
other = typex.New(args[0].T)
} else {
// We accept various forms for side input. We have to disambiguate
// []string into a Singleton of type []string or a Slice of type
// string by matching up the incoming type and the param type.
arg := args[0]
switch arg.Kind {
case funcx.FnValue:
if args[0].T.Kind() == reflect.Slice && t.Type() == args[0].T.Elem() {
// TODO(herohde) 6/29/2017: we do not allow universal slices, for now.
kind = Slice
other = typex.New(args[0].T.Elem())
} else {
kind = Singleton
other = typex.New(args[0].T)
}
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = Iter
other = typex.New(trimmed[0])
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = ReIter
other = typex.New(trimmed[0])
case funcx.FnMultiMap:
return nil, kind, errors.Errorf("input to MultiMap side input must be KV, got %v", t)
default:
return nil, kind, errors.Errorf("unexpected param kind: %v", arg)
}
}
case typex.Composite:
switch t.Type() {
case typex.KVType:
if isMain {
if args[0].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
}
if args[1].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("value of %v cannot bind to %v", t, args[1])
}
other = typex.NewKV(typex.New(args[0].T), typex.New(args[1].T))
} else {
switch args[0].Kind {
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 2 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = Iter
other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 2 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = ReIter
other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
case funcx.FnMultiMap:
values, _ := funcx.UnfoldMultiMap(args[0].T)
kind = MultiMap
trimmed := trimIllegal(values)
if len(trimmed) != 2 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
default:
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
}
case typex.CoGBKType:
if args[0].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
}
components := []typex.FullType{typex.New(args[0].T)}
for i := 1; i < len(args); i++ {
switch args[i].Kind {
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[i].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
components = append(components, typex.New(trimmed[0]))
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[i].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
components = append(components, typex.New(trimmed[0]))
default:
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
}
other = typex.NewCoGBK(components...)
default:
return nil, kind, errors.Errorf("unexpected inbound type: %v", t.Type())
}
default:
return nil, kind, errors.Errorf("unexpected inbound class: %v", t.Class())
}
if !typex.IsStructurallyAssignable(t, other) {
return nil, kind, errors.Errorf("%v is not assignable to %v", t, other)
}
return other, kind, nil
}
func inboundArity(t typex.FullType, isMain bool) (int, error) {
if t.Class() == typex.Composite {
switch t.Type() {
case typex.KVType:
if isMain {
return 2, nil
}
// A KV side input must be a single iterator/map.
return 1, nil
case typex.CoGBKType:
return len(t.Components()), nil
default:
return 0, errors.Errorf("unexpected composite inbound type: %v", t.Type())
}
}
return 1, nil
}
func trimIllegal(list []reflect.Type) []reflect.Type {
var ret []reflect.Type
for _, elm := range list {
switch typex.ClassOf(elm) {
case typex.Concrete, typex.Universal, typex.Container:
ret = append(ret, elm)
}
}
return ret
}