Skip to content

Commit

Permalink
GH-35652: [Go][Compute] Allow executing Substrait Expressions using G…
Browse files Browse the repository at this point in the history
…o Compute (#35654)

### Rationale for this change
Providing the ability to execute more complex expressions than single operations by leveraging Substrait's expression objects and deprecating the existing separate Expression interfaces in Go Arrow compute. This provides a quick integration with Substrait Expressions and ExtendedExpressions to start building more integrations.

### What changes are included in this PR?
This PR provides:

* an extension registry for Go arrow to provide mappings between Arrow and substrait for functions and for types along with other custom mappings if necessary. 
* Facilities to convert between Arrow data types and Substrait types
* Functions to evaluate Substrait expression objects with Arrow data as the input
* Functions to evaluate Substrait field references against Arrow data and arrow schemas

### Are these changes tested?
Yes, unit tests are included.

### Are there any user-facing changes?
Existing `compute.Expression` and its friends are being marked as deprecated.

* Closes: #35652

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade committed Jun 7, 2023
1 parent acf3cba commit 9be7074
Show file tree
Hide file tree
Showing 22 changed files with 3,016 additions and 131 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ swift/Arrow/.build

# Go dependencies
go/vendor
# go debug binaries
__debug_bin

# direnv
.envrc
6 changes: 4 additions & 2 deletions go/arrow/compute/arithmetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,8 @@ func RegisterScalarArithmetic(reg FunctionRegistry) {
}{
{"sub_unchecked", kernels.OpSub, decPromoteAdd, subUncheckedDoc},
{"sub", kernels.OpSubChecked, decPromoteAdd, subDoc},
{"subtract_unchecked", kernels.OpSub, decPromoteAdd, subUncheckedDoc},
{"subtract", kernels.OpSubChecked, decPromoteAdd, subDoc},
}

for _, o := range ops {
Expand Down Expand Up @@ -1088,8 +1090,8 @@ func Negate(ctx context.Context, opts ArithmeticOptions, input Datum) (Datum, er
// input. For x in the input:
//
// if x > 0: 1
// if x < 0: -1
// if x == 0: 0
// if x < 0: -1
// if x == 0: 0
func Sign(ctx context.Context, input Datum) (Datum, error) {
return CallFunction(ctx, "sign", nil, input)
}
Expand Down
6 changes: 3 additions & 3 deletions go/arrow/compute/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,20 @@ func execInternal(ctx context.Context, fn Function, opts FunctionOptions, passed

var (
k exec.Kernel
executor kernelExecutor
executor KernelExecutor
)

switch fn.Kind() {
case FuncScalar:
executor = scalarExecPool.Get().(*scalarExecutor)
defer func() {
executor.clear()
executor.Clear()
scalarExecPool.Put(executor.(*scalarExecutor))
}()
case FuncVector:
executor = vectorExecPool.Get().(*vectorExecutor)
defer func() {
executor.clear()
executor.Clear()
vectorExecPool.Put(executor.(*vectorExecutor))
}()
default:
Expand Down
24 changes: 13 additions & 11 deletions go/arrow/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ var (
// then be modified to set into a context.
//
// The default exec context uses the following values:
// - ChunkSize = DefaultMaxChunkSize (MaxInt64)
// - PreallocContiguous = true
// - Registry = GetFunctionRegistry()
// - ExecChannelSize = 10
// - NumParallel = runtime.NumCPU()
// - ChunkSize = DefaultMaxChunkSize (MaxInt64)
// - PreallocContiguous = true
// - Registry = GetFunctionRegistry()
// - ExecChannelSize = 10
// - NumParallel = runtime.NumCPU()
func DefaultExecCtx() ExecCtx { return defaultExecCtx }

func init() {
Expand Down Expand Up @@ -131,7 +131,7 @@ type ExecBatch struct {
Values []Datum
// Guarantee is a predicate Expression guaranteed to evaluate to true for
// all rows in this batch.
Guarantee Expression
// Guarantee Expression
// Len is the semantic length of this ExecBatch. When the values are
// all scalars, the length should be set to 1 for non-aggregate kernels.
// Otherwise the length is taken from the array values. Aggregate kernels
Expand Down Expand Up @@ -384,9 +384,9 @@ func inferBatchLength(values []Datum) (length int64, allSame bool) {
return
}

// kernelExecutor is the interface for all executors to initialize and
// KernelExecutor is the interface for all executors to initialize and
// call kernel execution functions on batches.
type kernelExecutor interface {
type KernelExecutor interface {
// Init must be called *after* the kernel's init method and any
// KernelState must be set into the KernelCtx *before* calling
// this Init method. This is to faciliate the case where
Expand All @@ -407,8 +407,8 @@ type kernelExecutor interface {
// CheckResultType checks the actual result type against the resolved
// output type. If the types don't match an error is returned
CheckResultType(out Datum) error

clear()
// Clear resets the state in the executor so that it can be reused.
Clear()
}

// the base implementation for executing non-aggregate kernels.
Expand All @@ -422,7 +422,7 @@ type nonAggExecImpl struct {
preallocValidity bool
}

func (e *nonAggExecImpl) clear() {
func (e *nonAggExecImpl) Clear() {
e.ctx, e.kernel, e.outType = nil, nil, nil
if e.dataPrealloc != nil {
e.dataPrealloc = e.dataPrealloc[:0]
Expand Down Expand Up @@ -479,6 +479,8 @@ func (e *nonAggExecImpl) CheckResultType(out Datum) error {

type spanIterator func() (exec.ExecSpan, int64, bool)

func NewScalarExecutor() KernelExecutor { return &scalarExecutor{} }

type scalarExecutor struct {
nonAggExecImpl

Expand Down
11 changes: 10 additions & 1 deletion go/arrow/compute/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ var hashSeed = maphash.MakeSeed()

// Expression is an interface for mapping one datum to another. An expression
// is one of:
//
// A literal Datum
// A reference to a single (potentially nested) field of an input Datum
// A reference to a single (potentially nested) field of an input Datum
// A call to a compute function, with arguments specified by other Expressions
//
// Deprecated: use substrait-go expressions instead.
type Expression interface {
fmt.Stringer
// IsBound returns true if this expression has been bound to a particular
Expand Down Expand Up @@ -95,6 +98,8 @@ func printDatum(datum Datum) string {

// Literal is an expression denoting a literal Datum which could be any value
// as a scalar, an array, or so on.
//
// Deprecated: use substrait-go expressions Literal instead.
type Literal struct {
Literal Datum
}
Expand Down Expand Up @@ -144,6 +149,8 @@ func (l *Literal) Release() {

// Parameter represents a field reference and needs to be bound in order to determine
// its type and shape.
//
// Deprecated: use substrait-go field references instead.
type Parameter struct {
ref *FieldRef

Expand Down Expand Up @@ -265,6 +272,8 @@ func optionsToString(fn FunctionOptions) string {
// Call is a function call with specific arguments which are themselves other
// expressions. A call can also have options that are specific to the function
// in question. It must be bound to determine the shape and type.
//
// Deprecated: use substrait-go expression functions instead.
type Call struct {
funcName string
args []Expression
Expand Down

0 comments on commit 9be7074

Please sign in to comment.