Skip to content

Commit

Permalink
Refactor, add expressions, add filter node.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 7, 2023
1 parent d49fbe7 commit 1bb9bbc
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 14 deletions.
18 changes: 13 additions & 5 deletions arrowexec/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ import (
"github.com/apache/arrow/go/v13/arrow"
)

const BatchSize = 16 * 1024

type Context context.Context
type ProduceContext context.Context
// All nodes will try to create batches of approximately this size. Different sizes are allowed.
const IdealBatchSize = 16 * 1024

type Context struct {
Context context.Context
// TODO: We'll also need the variable context here.
// Maybe instead of storing a linked list of lists here, we should store references and indices of the underlying arrays?
// Basically, store a reference to each parent scope, the relevant record, and entry index of the value in that record.
}
type ProduceContext struct {
Context
}

type Node interface {
Run(ctx Context, produce ProduceFunc) error
Expand All @@ -20,7 +28,7 @@ type NodeWithMeta struct {
Schema *arrow.Schema
}

type ProduceFunc func(ctx ProduceContext, record Record) error
type ProduceFunc func(produceCtx ProduceContext, record Record) error

type Record struct {
arrow.Record
Expand Down
39 changes: 39 additions & 0 deletions arrowexec/execution/expression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package execution

import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/scalar"
)

type ScalarExpression interface {
EvaluateScalar(ctx Context) (scalar.Scalar, error)
}

type Expression interface {
Evaluate(ctx Context, record Record) (arrow.Array, error)
}

type RecordVariable struct {
index int
}

func (r *RecordVariable) Evaluate(ctx Context, record Record) (arrow.Array, error) {
return record.Column(r.index), nil
}

// TODO: Add ConstArray expression for testing.

type ConstArray struct {
Array arrow.Array
}

func (c *ConstArray) Evaluate(ctx Context, record Record) (arrow.Array, error) {
if c.Array.Len() != int(record.NumRows()) {
panic("const array length doesn't match record length")
}
return c.Array, nil
}

// type ParentScopeVariable struct {
//
// }
4 changes: 4 additions & 0 deletions arrowexec/materialize/materialize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package materialize

// TODO: Here we'll want some code to turn a physical plan into an execution plan.
// We'll need a plan transformer, but also e.g. translate octosql schemas into arrow schemas.
127 changes: 127 additions & 0 deletions arrowexec/nodes/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package nodes

import (
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/compute"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/execution"
"golang.org/x/sync/errgroup"
)

// Here there are two Filter implementations.
// The NaiveFilter just uses the arrow library function.
// Its advantage is that it supports all formats of data and is
// a bit (~1.4x) faster if most of the rows are filtered out.
// The RebatchingFilter has a custom routine for filtering records, it
// re-batches the filtered records so that they aren't too far off the
// ideal batch size.
// It actually ends up being *much* (~3x) faster if only few
// records are being filtered out.
// The break-even point for some naive integer arrays is at ~3.5% of records
// being filtered out.
//
// It's interesting, cause the original idea for the re-batching was
// that downstream operators should be faster if batches aren't too small.
// However, with most of the records filtered out, the workload for downstream
// operators is so small that it doesn't really matter.

// NaiveFilter uses the arrow libraries selection function.
type NaiveFilter struct {
Source execution.NodeWithMeta
Predicate execution.Expression
}

func (f *NaiveFilter) Run(ctx execution.Context, produce execution.ProduceFunc) error {
return f.Source.Node.Run(ctx, func(produceCtx execution.ProduceContext, record execution.Record) error {
selection, err := f.Predicate.Evaluate(produceCtx.Context, record)
if err != nil {
return fmt.Errorf("couldn't evaluate filter predicate: %w", err)
}

out, err := compute.FilterRecordBatch(ctx.Context, record, selection, &compute.FilterOptions{
NullSelection: compute.SelectionDropNulls,
})
if err != nil {
return fmt.Errorf("couldn't filter record batch: %w", err)
}

if err := produce(produceCtx, execution.Record{Record: out}); err != nil {
return fmt.Errorf("couldn't produce record: %w", err)
}

return nil
})
}

// RebatchingFilter has a custom routine for filtering records, it re-batches the filtered records so that they aren't too far off the ideal batch size.
type RebatchingFilter struct {
Source execution.NodeWithMeta
Predicate execution.Expression
}

func (f *RebatchingFilter) Run(ctx execution.Context, produce execution.ProduceFunc) error {
recordBuilder := array.NewRecordBuilder(memory.NewGoAllocator(), f.Source.Schema) // TODO: Get allocator as argument.
if err := f.Source.Node.Run(ctx, func(produceCtx execution.ProduceContext, record execution.Record) error {
selection, err := f.Predicate.Evaluate(produceCtx.Context, record)
if err != nil {
return fmt.Errorf("couldn't evaluate filter predicate: %w", err)
}

g, _ := errgroup.WithContext(ctx.Context)
columns := record.Columns()
for i, column := range columns {
rewriter := MakeColumnRewriter(recordBuilder.Field(i), column)
g.Go(func() error {
Rewrite(selection, rewriter)
return nil
})
}
g.Wait()

// TODO: What if there are no fields...? This is a case that's generally unhandled right now everywhere. Need to add a count to the record struct.
if recordBuilder.Field(0).Len() > execution.IdealBatchSize/2 {
outRecord := recordBuilder.NewRecord()
if err := produce(produceCtx, execution.Record{Record: outRecord}); err != nil {
return fmt.Errorf("couldn't produce record: %w", err)
}
}
return nil
}); err != nil {
return fmt.Errorf("couldn't run source node: %w", err)
}

if recordBuilder.Field(0).Len() > 0 {
outRecord := recordBuilder.NewRecord()
if err := produce(execution.ProduceContext{Context: ctx}, execution.Record{Record: outRecord}); err != nil {
return fmt.Errorf("couldn't produce record: %w", err)
}
}

return nil
}

func MakeColumnRewriter(builder array.Builder, arr arrow.Array) func(rowIndex int) {
// TODO: Should this operate on row ranges instead of single rows? Would make low-selectivity workloads faster, as well as nested types.
switch builder.Type().ID() {
case arrow.INT64:
typedBuilder := builder.(*array.Int64Builder)
typedArr := arr.(*array.Int64)
return func(rowIndex int) {
typedBuilder.Append(typedArr.Value(rowIndex))
}
default:
panic(fmt.Errorf("unsupported type for filtering: %v", builder.Type().ID()))
}
}

func Rewrite(selection arrow.Array, rewriteFunc func(rowIndex int)) {
typedSelection := selection.(*array.Boolean)
for i := 0; i < typedSelection.Len(); i++ {
if typedSelection.Value(i) {
rewriteFunc(i)
}
}
}
156 changes: 156 additions & 0 deletions arrowexec/nodes/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package nodes

import (
"context"
"math/rand"
"testing"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/execution"
)

// selectivity as a tenth of a percent (so 1000 means 100%)
const selectivity = 35
const rounds = 1024

var predicateArr = func() arrow.Array {
predicateBuilder := array.NewBooleanBuilder(memory.DefaultAllocator)
for i := 0; i < execution.IdealBatchSize; i++ {
if rand.Intn(1000) < selectivity {
predicateBuilder.Append(true)
} else {
predicateBuilder.Append(false)
}
}
return predicateBuilder.NewArray()
}()

func BenchmarkNaiveFilter(b *testing.B) {
groupBuilder := array.NewInt64Builder(memory.DefaultAllocator)
for i := 0; i < execution.IdealBatchSize; i++ {
groupBuilder.Append(1)
}
groupArr := groupBuilder.NewArray()
numbersBuilder := array.NewInt64Builder(memory.DefaultAllocator)
for i := 0; i < execution.IdealBatchSize; i++ {
numbersBuilder.Append(int64(i))
}
numbersArr := numbersBuilder.NewArray()

schema := arrow.NewSchema(
[]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
{Name: "b", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
},
nil,
)

var node execution.NodeWithMeta
node = execution.NodeWithMeta{
Node: &TestNode{
Records: []execution.Record{{Record: array.NewRecord(schema, []arrow.Array{groupArr, numbersArr}, execution.IdealBatchSize)}},
Repetitions: rounds,
},
Schema: schema,
}
node = execution.NodeWithMeta{
Node: &NaiveFilter{
Source: node,
Predicate: &execution.ConstArray{
Array: predicateArr,
},
},
Schema: schema,
}
node = execution.NodeWithMeta{
Node: &GroupBy{
OutSchema: schema,
Source: node,
KeyExprs: []int{0},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{MakeCount},
AggregateExprs: []int{1},
},
Schema: schema,
}

for i := 0; i < b.N; i++ {
var outRecords []execution.Record
var count int64
if err := node.Node.Run(execution.Context{Context: context.Background()}, func(produceCtx execution.ProduceContext, record execution.Record) error {
// log.Println(record)
outRecords = append(outRecords, record)
count += record.NumRows()
return nil
}); err != nil {
panic(err)
}
outRecords = outRecords
// log.Println("naive count:", count)
}
}

func BenchmarkRebatchingFilter(b *testing.B) {
groupBuilder := array.NewInt64Builder(memory.DefaultAllocator)
for i := 0; i < execution.IdealBatchSize; i++ {
groupBuilder.Append(1)
}
groupArr := groupBuilder.NewArray()
numbersBuilder := array.NewInt64Builder(memory.DefaultAllocator)
for i := 0; i < execution.IdealBatchSize; i++ {
numbersBuilder.Append(int64(i))
}
numbersArr := numbersBuilder.NewArray()

schema := arrow.NewSchema(
[]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
{Name: "b", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
},
nil,
)

var node execution.NodeWithMeta
node = execution.NodeWithMeta{
Node: &TestNode{
Records: []execution.Record{{Record: array.NewRecord(schema, []arrow.Array{groupArr, numbersArr}, execution.IdealBatchSize)}},
Repetitions: rounds,
},
Schema: schema,
}
node = execution.NodeWithMeta{
Node: &RebatchingFilter{
Source: node,
Predicate: &execution.ConstArray{
Array: predicateArr,
},
},
Schema: schema,
}
node = execution.NodeWithMeta{
Node: &GroupBy{
OutSchema: schema,
Source: node,
KeyExprs: []int{0},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{MakeCount},
AggregateExprs: []int{1},
},
Schema: schema,
}

for i := 0; i < b.N; i++ {
var outRecords []execution.Record
var count int64
if err := node.Node.Run(execution.Context{Context: context.Background()}, func(produceCtx execution.ProduceContext, record execution.Record) error {
// log.Println(record)
outRecords = append(outRecords, record)
count += record.NumRows()
return nil
}); err != nil {
panic(err)
}
outRecords = outRecords
// log.Println("rebatching count:", count)
}
}
10 changes: 5 additions & 5 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) erro
return err
}

for batchIndex := 0; batchIndex < (entryCount/execution.BatchSize)+1; batchIndex++ {
offset := batchIndex * execution.BatchSize
for batchIndex := 0; batchIndex < (entryCount/execution.IdealBatchSize)+1; batchIndex++ {
offset := batchIndex * execution.IdealBatchSize
length := entryCount - offset
if length > execution.BatchSize {
length = execution.BatchSize
if length > execution.IdealBatchSize {
length = execution.IdealBatchSize
}

columns := make([]arrow.Array, len(g.OutSchema.Fields()))
Expand All @@ -102,7 +102,7 @@ func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) erro

record := array.NewRecord(g.OutSchema, columns, int64(length))

if err := produce(ctx, execution.Record{Record: record}); err != nil {
if err := produce(execution.ProduceContext{Context: ctx}, execution.Record{Record: record}); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 1bb9bbc

Please sign in to comment.