Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 7, 2023
1 parent 1bb9bbc commit 432e2e2
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
1 change: 1 addition & 0 deletions arrowexec/execution/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RecordVariable struct {
}

func (r *RecordVariable) Evaluate(ctx Context, record Record) (arrow.Array, error) {
// TODO: Retain array?
return record.Column(r.index), nil
}

Expand Down
8 changes: 4 additions & 4 deletions arrowexec/nodes/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func BenchmarkNaiveFilter(b *testing.B) {
Node: &GroupBy{
OutSchema: schema,
Source: node,
KeyExprs: []int{0},
KeyColumns: []int{0},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{MakeCount},
AggregateExprs: []int{1},
AggregateColumns: []int{1},
},
Schema: schema,
}
Expand Down Expand Up @@ -132,9 +132,9 @@ func BenchmarkRebatchingFilter(b *testing.B) {
Node: &GroupBy{
OutSchema: schema,
Source: node,
KeyExprs: []int{0},
KeyColumns: []int{0},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{MakeCount},
AggregateExprs: []int{1},
AggregateColumns: []int{1},
},
Schema: schema,
}
Expand Down
20 changes: 11 additions & 9 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,39 @@ type GroupBy struct {
OutSchema *arrow.Schema
Source execution.NodeWithMeta

KeyExprs []int // For now, let's just use indices here.
// Both keys and aggregate columns have to be calculated by a preceding map.

KeyColumns []int
AggregateConstructors []func(dt arrow.DataType) Aggregate
AggregateExprs []int // For now, let's just use indices here.
AggregateColumns []int
}

func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) error {
entryCount := 0
entryIndices := intintmap.New(16, 0.6)
aggregates := make([]Aggregate, len(g.AggregateConstructors))
for i := range aggregates {
aggregates[i] = g.AggregateConstructors[i](g.Source.Schema.Field(g.AggregateExprs[i]).Type)
aggregates[i] = g.AggregateConstructors[i](g.Source.Schema.Field(g.AggregateColumns[i]).Type)
}
key := make([]Key, len(g.KeyExprs))
key := make([]Key, len(g.KeyColumns))
for i := range key {
key[i] = MakeKey(g.Source.Schema.Field(g.KeyExprs[i]).Type)
key[i] = MakeKey(g.Source.Schema.Field(g.KeyColumns[i]).Type)
}

if err := g.Source.Node.Run(ctx, func(ctx execution.ProduceContext, record execution.Record) error {
getKeyHash := MakeKeyHasher(g.Source.Schema, record, g.KeyExprs)
getKeyHash := MakeKeyHasher(g.Source.Schema, record, g.KeyColumns)

aggColumnConsumers := make([]func(entryIndex uint, rowIndex uint), len(aggregates))
for i := range aggColumnConsumers {
aggColumnConsumers[i] = aggregates[i].MakeColumnConsumer(record.Column(g.AggregateExprs[i]))
aggColumnConsumers[i] = aggregates[i].MakeColumnConsumer(record.Column(g.AggregateColumns[i]))
}
newKeyAdders := make([]func(rowIndex uint), len(key))
for i := range newKeyAdders {
newKeyAdders[i] = key[i].MakeNewKeyAdder(record.Column(g.KeyExprs[i]))
newKeyAdders[i] = key[i].MakeNewKeyAdder(record.Column(g.KeyColumns[i]))
}
keyEqualityCheckers := make([]func(entryIndex uint, rowIndex uint) bool, len(key))
for i := range keyEqualityCheckers {
keyEqualityCheckers[i] = key[i].MakeKeyEqualityChecker(record.Column(g.KeyExprs[i]))
keyEqualityCheckers[i] = key[i].MakeKeyEqualityChecker(record.Column(g.KeyColumns[i]))
}

rows := record.NumRows()
Expand Down
31 changes: 16 additions & 15 deletions arrowexec/nodes/group_by_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func TestGroupBy(t *testing.T) {
ctx := context.Background()
allocator := memory.NewGoAllocator()

schema := arrow.NewSchema(
Expand Down Expand Up @@ -56,15 +57,15 @@ func TestGroupBy(t *testing.T) {
),
Source: node,

KeyExprs: []int{0},
AggregateExprs: []int{1},
KeyColumns: []int{0},
AggregateColumns: []int{1},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{
MakeSum,
},
},
}

if err := node.Node.Run(context.Background(), func(ctx execution.ProduceContext, record execution.Record) error {
if err := node.Node.Run(execution.Context{Context: ctx}, func(ctx execution.ProduceContext, record execution.Record) error {
log.Println(record)
return nil
}); err != nil {
Expand Down Expand Up @@ -92,13 +93,13 @@ func BenchmarkGroupBy(b *testing.B) {
aBuilder := array.NewInt64Builder(allocator)
bBuilder := array.NewInt64Builder(allocator)

for i := 0; i < execution.BatchSize; i++ {
aBuilder.Append(int64((arrayIndex*execution.BatchSize + i) % groups))
bBuilder.Append(int64(arrayIndex*execution.BatchSize + i))
for i := 0; i < execution.IdealBatchSize; i++ {
aBuilder.Append(int64((arrayIndex*execution.IdealBatchSize + i) % groups))
bBuilder.Append(int64(arrayIndex*execution.IdealBatchSize + i))
}

records = append(records, execution.Record{
Record: array.NewRecord(schema, []arrow.Array{aBuilder.NewArray(), bBuilder.NewArray()}, execution.BatchSize),
Record: array.NewRecord(schema, []arrow.Array{aBuilder.NewArray(), bBuilder.NewArray()}, execution.IdealBatchSize),
})
}

Expand All @@ -124,16 +125,16 @@ func BenchmarkGroupBy(b *testing.B) {
),
Source: node,

KeyExprs: []int{0},
AggregateExprs: []int{1},
KeyColumns: []int{0},
AggregateColumns: []int{1},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{
MakeSum,
},
},
}
b.StartTimer()

if err := node.Node.Run(ctx, func(ctx execution.ProduceContext, record execution.Record) error {
if err := node.Node.Run(execution.Context{Context: ctx}, func(ctx execution.ProduceContext, record execution.Record) error {
outArrays = append(outArrays, record.Record.Columns()[0])
return nil
}); err != nil {
Expand Down Expand Up @@ -161,7 +162,7 @@ func BenchmarkGroupByString(b *testing.B) {
for arrayIndex := 0; arrayIndex < rounds; arrayIndex++ {
aBuilder := array.NewStringBuilder(allocator)

for i := 0; i < execution.BatchSize; i++ {
for i := 0; i < execution.IdealBatchSize; i++ {
switch rand.Intn(4) {
case 0:
aBuilder.Append("aaa")
Expand All @@ -175,7 +176,7 @@ func BenchmarkGroupByString(b *testing.B) {
}

records = append(records, execution.Record{
Record: array.NewRecord(schema, []arrow.Array{aBuilder.NewArray()}, execution.BatchSize),
Record: array.NewRecord(schema, []arrow.Array{aBuilder.NewArray()}, execution.IdealBatchSize),
})
}

Expand All @@ -201,16 +202,16 @@ func BenchmarkGroupByString(b *testing.B) {
),
Source: node,

KeyExprs: []int{0},
AggregateExprs: []int{0},
KeyColumns: []int{0},
AggregateColumns: []int{0},
AggregateConstructors: []func(dt arrow.DataType) Aggregate{
MakeCount,
},
},
}
b.StartTimer()

if err := node.Node.Run(ctx, func(ctx execution.ProduceContext, record execution.Record) error {
if err := node.Node.Run(execution.Context{Context: ctx}, func(ctx execution.ProduceContext, record execution.Record) error {
outArrays = append(outArrays, record.Record.Columns()[0])
return nil
}); err != nil {
Expand Down

0 comments on commit 432e2e2

Please sign in to comment.