-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
functions.go
430 lines (369 loc) · 13.6 KB
/
functions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package compute
import (
"context"
"fmt"
"strings"
"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/compute/exec"
)
type Function interface {
Name() string
Kind() FuncKind
Arity() Arity
Doc() FunctionDoc
NumKernels() int
Execute(context.Context, FunctionOptions, ...Datum) (Datum, error)
DispatchExact(...arrow.DataType) (exec.Kernel, error)
DispatchBest(...arrow.DataType) (exec.Kernel, error)
DefaultOptions() FunctionOptions
Validate() error
}
// Arity defines the number of required arguments for a function.
//
// Naming conventions are taken from https://en.wikipedia.org/wiki/Arity
type Arity struct {
NArgs int
IsVarArgs bool
}
// Convenience functions to generating Arities
func Nullary() Arity { return Arity{0, false} }
func Unary() Arity { return Arity{1, false} }
func Binary() Arity { return Arity{2, false} }
func Ternary() Arity { return Arity{3, false} }
func VarArgs(minArgs int) Arity { return Arity{minArgs, true} }
type FunctionDoc struct {
// A one-line summary of the function, using a verb.
//
// For example, "Add two numeric arrays or scalars"
Summary string
// A detailed description of the function, meant to follow the summary.
Description string
// Symbolic names (identifiers) for the function arguments.
//
// Can be used to generate nicer function signatures.
ArgNames []string
// Name of the options struct type, if any
OptionsType string
// Whether or not options are required for function execution.
//
// If false, then either there are no options for this function,
// or there is a usable default options value.
OptionsRequired bool
}
// EmptyFuncDoc is a reusable empty function doc definition for convenience.
var EmptyFuncDoc FunctionDoc
// FuncKind is an enum representing the type of a function
type FuncKind int8
const (
// A function that performs scalar data operations on whole arrays
// of data. Can generally process Array or Scalar values. The size
// of the output will be the same as the size (or broadcasted size,
// in the case of mixing Array and Scalar inputs) of the input.
FuncScalar FuncKind = iota // Scalar
// A function with array input and output whose behavior depends on
// the values of the entire arrays passed, rather than the value of
// each scalar value.
FuncVector // Vector
// A function that computes a scalar summary statistic from array input.
FuncScalarAgg // ScalarAggregate
// A function that computes grouped summary statistics from array
// input and an array of group identifiers.
FuncHashAgg // HashAggregate
// A function that dispatches to other functions and does not contain
// its own kernels.
FuncMeta // Meta
)
func validateFunctionSummary(summary string) error {
if strings.Contains(summary, "\n") {
return fmt.Errorf("%w: summary contains a newline", arrow.ErrInvalid)
}
if summary[len(summary)-1] == '.' {
return fmt.Errorf("%w: summary ends with a point", arrow.ErrInvalid)
}
return nil
}
func validateFunctionDescription(desc string) error {
if len(desc) != 0 && desc[len(desc)-1] == '\n' {
return fmt.Errorf("%w: description ends with a newline", arrow.ErrInvalid)
}
const maxLineSize = 78
for _, ln := range strings.Split(desc, "\n") {
if len(ln) > maxLineSize {
return fmt.Errorf("%w: description line length exceeds %d characters", arrow.ErrInvalid, maxLineSize)
}
}
return nil
}
// baseFunction is the base class for compute functions. Function
// implementations should embed this baseFunction and will contain
// a collection of "kernels" which are implementations of the function
// for specific argument types. Selecting a viable kernel for
// executing the function is referred to as "dispatching".
type baseFunction struct {
name string
kind FuncKind
arity Arity
doc FunctionDoc
defaultOpts FunctionOptions
}
func (b *baseFunction) Name() string { return b.name }
func (b *baseFunction) Kind() FuncKind { return b.kind }
func (b *baseFunction) Arity() Arity { return b.arity }
func (b *baseFunction) Doc() FunctionDoc { return b.doc }
func (b *baseFunction) DefaultOptions() FunctionOptions { return b.defaultOpts }
func (b *baseFunction) Validate() error {
if b.doc.Summary == "" {
return nil
}
argCount := len(b.doc.ArgNames)
if argCount != b.arity.NArgs && !(b.arity.IsVarArgs && argCount == b.arity.NArgs+1) {
return fmt.Errorf("in function '%s': number of argument names for function doc != function arity", b.name)
}
if err := validateFunctionSummary(b.doc.Summary); err != nil {
return err
}
return validateFunctionDescription(b.doc.Description)
}
func checkOptions(fn Function, opts FunctionOptions) error {
if opts == nil && fn.Doc().OptionsRequired {
return fmt.Errorf("%w: function '%s' cannot be called without options", arrow.ErrInvalid, fn.Name())
}
return nil
}
func (b *baseFunction) checkArity(nargs int) error {
switch {
case b.arity.IsVarArgs && nargs < b.arity.NArgs:
return fmt.Errorf("%w: varargs function '%s' needs at least %d arguments, but only %d passed",
arrow.ErrInvalid, b.name, b.arity.NArgs, nargs)
case !b.arity.IsVarArgs && nargs != b.arity.NArgs:
return fmt.Errorf("%w: function '%s' accepts %d arguments but %d passed",
arrow.ErrInvalid, b.name, b.arity.NArgs, nargs)
}
return nil
}
// kernelType is a type constraint interface that is used for funcImpl
// generic definitions. It will be extended as other kernel types
// are defined.
//
// Currently only ScalarKernels are allowed to be used.
type kernelType interface {
exec.ScalarKernel | exec.VectorKernel
// specifying the Kernel interface here allows us to utilize
// the methods of the Kernel interface on the generic
// constrained type
exec.Kernel
}
// funcImpl is the basic implementation for any functions that use kernels
// i.e. all except for Meta functions.
type funcImpl[KT kernelType] struct {
baseFunction
kernels []KT
}
func (fi *funcImpl[KT]) DispatchExact(vals ...arrow.DataType) (*KT, error) {
if err := fi.checkArity(len(vals)); err != nil {
return nil, err
}
for i := range fi.kernels {
if fi.kernels[i].GetSig().MatchesInputs(vals) {
return &fi.kernels[i], nil
}
}
return nil, fmt.Errorf("%w: function '%s' has no kernel matching input types %s",
arrow.ErrNotImplemented, fi.name, arrow.TypesToString(vals))
}
func (fi *funcImpl[KT]) NumKernels() int { return len(fi.kernels) }
func (fi *funcImpl[KT]) Kernels() []*KT {
res := make([]*KT, len(fi.kernels))
for i := range fi.kernels {
res[i] = &fi.kernels[i]
}
return res
}
// A ScalarFunction is a function that executes element-wise operations
// on arrays or scalars, and therefore whose results generally do not
// depend on the order of the values in the arguments. Accepts and returns
// arrays that are all of the same size. These functions roughly correspond
// to the functions used in most SQL expressions.
type ScalarFunction struct {
funcImpl[exec.ScalarKernel]
}
// NewScalarFunction constructs a new ScalarFunction object with the passed in
// name, arity and function doc.
func NewScalarFunction(name string, arity Arity, doc FunctionDoc) *ScalarFunction {
return &ScalarFunction{
funcImpl: funcImpl[exec.ScalarKernel]{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
kind: FuncScalar,
},
},
}
}
func (s *ScalarFunction) SetDefaultOptions(opts FunctionOptions) {
s.defaultOpts = opts
}
func (s *ScalarFunction) DispatchExact(vals ...arrow.DataType) (exec.Kernel, error) {
return s.funcImpl.DispatchExact(vals...)
}
func (s *ScalarFunction) DispatchBest(vals ...arrow.DataType) (exec.Kernel, error) {
return s.DispatchExact(vals...)
}
// AddNewKernel constructs a new kernel with the provided signature
// and execution/init functions and then adds it to the function's list of
// kernels. This assumes default null handling (intersection of validity bitmaps)
func (s *ScalarFunction) AddNewKernel(inTypes []exec.InputType, outType exec.OutputType, execFn exec.ArrayKernelExec, init exec.KernelInitFn) error {
if err := s.checkArity(len(inTypes)); err != nil {
return err
}
if s.arity.IsVarArgs && len(inTypes) != 1 {
return fmt.Errorf("%w: varargs signatures must have exactly one input type", arrow.ErrInvalid)
}
sig := &exec.KernelSignature{
InputTypes: inTypes,
OutType: outType,
IsVarArgs: s.arity.IsVarArgs,
}
s.kernels = append(s.kernels, exec.NewScalarKernelWithSig(sig, execFn, init))
return nil
}
// AddKernel adds the provided kernel to the list of kernels
// this function has. A copy of the kernel is added to the slice of kernels,
// which means that a given kernel object can be created, added and then
// reused to add other kernels.
func (s *ScalarFunction) AddKernel(k exec.ScalarKernel) error {
if err := s.checkArity(len(k.Signature.InputTypes)); err != nil {
return err
}
if s.arity.IsVarArgs && !k.Signature.IsVarArgs {
return fmt.Errorf("%w: function accepts varargs but kernel signature does not", arrow.ErrInvalid)
}
s.kernels = append(s.kernels, k)
return nil
}
// Execute uses the passed in context, function options and arguments to eagerly
// execute the function using kernel dispatch, batch iteration and memory
// allocation details as defined by the kernel.
//
// If opts is nil, then the DefaultOptions() will be used.
func (s *ScalarFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
return execInternal(ctx, s, opts, -1, args...)
}
type VectorFunction struct {
funcImpl[exec.VectorKernel]
}
func NewVectorFunction(name string, arity Arity, doc FunctionDoc) *VectorFunction {
return &VectorFunction{
funcImpl: funcImpl[exec.VectorKernel]{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
kind: FuncVector,
},
},
}
}
func (f *VectorFunction) SetDefaultOptions(opts FunctionOptions) {
f.defaultOpts = opts
}
func (f *VectorFunction) DispatchExact(vals ...arrow.DataType) (exec.Kernel, error) {
return f.funcImpl.DispatchExact(vals...)
}
func (f *VectorFunction) DispatchBest(vals ...arrow.DataType) (exec.Kernel, error) {
return f.DispatchExact(vals...)
}
func (f *VectorFunction) AddNewKernel(inTypes []exec.InputType, outType exec.OutputType, execFn exec.ArrayKernelExec, init exec.KernelInitFn) error {
if err := f.checkArity(len(inTypes)); err != nil {
return err
}
if f.arity.IsVarArgs && len(inTypes) != 1 {
return fmt.Errorf("%w: varags signatures must have exactly one input type", arrow.ErrInvalid)
}
sig := &exec.KernelSignature{
InputTypes: inTypes,
OutType: outType,
IsVarArgs: f.arity.IsVarArgs,
}
f.kernels = append(f.kernels, exec.NewVectorKernelWithSig(sig, execFn, init))
return nil
}
func (f *VectorFunction) AddKernel(kernel exec.VectorKernel) error {
if err := f.checkArity(len(kernel.Signature.InputTypes)); err != nil {
return err
}
if f.arity.IsVarArgs && !kernel.Signature.IsVarArgs {
return fmt.Errorf("%w: function accepts varargs but kernel signature does not", arrow.ErrInvalid)
}
f.kernels = append(f.kernels, kernel)
return nil
}
func (f *VectorFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
return execInternal(ctx, f, opts, -1, args...)
}
// MetaFunctionImpl is the signature needed for implementing a MetaFunction
// which is a function that dispatches to another function instead.
type MetaFunctionImpl func(context.Context, FunctionOptions, ...Datum) (Datum, error)
// MetaFunction is a function which dispatches to other functions, the impl
// must not be nil.
//
// For Array, ChunkedArray and Scalar datums, this may rely on the execution
// of concrete function types, but this must handle other Datum kinds on its
// own.
type MetaFunction struct {
baseFunction
impl MetaFunctionImpl
}
// NewMetaFunction constructs a new MetaFunction which will call the provided
// impl for dispatching with the expected arity.
//
// Will panic if impl is nil.
func NewMetaFunction(name string, arity Arity, doc FunctionDoc, impl MetaFunctionImpl) *MetaFunction {
if impl == nil {
panic("arrow/compute: cannot construct MetaFunction with nil impl")
}
return &MetaFunction{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
},
impl: impl,
}
}
func (MetaFunction) NumKernels() int { return 0 }
func (m *MetaFunction) DispatchExact(...arrow.DataType) (exec.Kernel, error) {
return nil, fmt.Errorf("%w: dispatch for metafunction", arrow.ErrNotImplemented)
}
func (m *MetaFunction) DispatchBest(...arrow.DataType) (exec.Kernel, error) {
return nil, fmt.Errorf("%w: dispatch for metafunction", arrow.ErrNotImplemented)
}
func (m *MetaFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
if err := m.checkArity(len(args)); err != nil {
return nil, err
}
if err := checkOptions(m, opts); err != nil {
return nil, err
}
if opts == nil {
opts = m.defaultOpts
}
return m.impl(ctx, opts, args...)
}