Skip to content

Commit

Permalink
Create custom "string builder" with fast existing value access.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 7, 2023
1 parent df47770 commit 5372273
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nodes

import (
"unsafe"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/bitutil"
Expand All @@ -10,9 +12,6 @@ import (
"github.com/segmentio/fasthash/fnv1a"
)

// TODO: Separate implementation for variable-size binary key and fixed-size binary key?
// TODO: For now we just implement hash value equality, but it really should store a list of indices underneath and compare for actual value equality.

type GroupBy struct {
OutSchema *arrow.Schema
Source execution.NodeWithMeta
Expand All @@ -25,7 +24,6 @@ type GroupBy struct {
func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) error {
entryCount := 0
entryIndices := intintmap.New(16, 0.6)
// entryIndices := make(map[uint64]uint)
aggregates := make([]Aggregate, len(g.AggregateConstructors))
for i := range aggregates {
aggregates[i] = g.AggregateConstructors[i](g.Source.Schema.Field(g.AggregateExprs[i]).Type)
Expand Down Expand Up @@ -72,6 +70,7 @@ func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) erro
}
}
if !equal {
// TODO: Really, we should instead use an int -> []int map here, and if there's no equality, we should append a new index to the slice.
panic("hash collision")
}
}
Expand Down Expand Up @@ -208,7 +207,8 @@ func MakeKey(dt arrow.DataType) Key {
}
case arrow.STRING:
return &KeyString{
builder: array.NewStringBuilder(memory.NewGoAllocator()), // TODO: Get allocator as argument.
stringData: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
offsetsData: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
}
default:
panic("unsupported type for group by key")
Expand Down Expand Up @@ -251,31 +251,62 @@ func (key *KeyInt) GetBatch(length int, offset int) arrow.Array {
}

type KeyString struct {
builder *array.StringBuilder
// TODO: Refactor to use custom array data.
// We use a custom set of buffers, rather than using the builder, because accessing elements of the builder is slow.
offsetsData *memory.Buffer // TODO: Release these buffers at some point.
offsetsState []int32 // This uses the above data as the storage underneath.
stringData *memory.Buffer // TODO: Release these buffers at some point.
stringByteState []byte // This uses the above data as the storage underneath.
stringState string

length int
count int

finishedArray *array.String
}

func (key *KeyString) MakeNewKeyAdder(arr arrow.Array) func(rowIndex uint) {
typedArr := arr.(*array.String)
return func(rowIndex uint) {
key.builder.Append(typedArr.Value(int(rowIndex)))
// TODO: Benchmark vs using the raw bytes and offsets underneath.
// Just benchmark allocations here.
value := typedArr.Value(int(rowIndex))
if key.length+len(value) > len(key.stringByteState) {
key.stringData.Resize(bitutil.NextPowerOf2(key.length + len(value)))
key.stringByteState = key.stringData.Bytes()
key.stringState = *(*string)(unsafe.Pointer(&key.stringByteState))
}
// We always want a spare place for the final offset, so we add 1 for that.
potentialFinalOffsetCount := key.count + 2
if potentialFinalOffsetCount > len(key.offsetsState) {
key.offsetsData.Resize(arrow.Int32Traits.BytesRequired(bitutil.NextPowerOf2(potentialFinalOffsetCount)))
key.offsetsState = arrow.Int32Traits.CastFromBytes(key.offsetsData.Bytes())
}

key.offsetsState[key.count] = int32(key.length)
copy(key.stringByteState[key.length:], value)

key.length += len(value)
key.count++
}
}

func (key *KeyString) MakeKeyEqualityChecker(arr arrow.Array) func(entryIndex uint, rowIndex uint) bool {
typedArr := arr.(*array.String)
return func(entryIndex uint, rowIndex uint) bool {
return typedArr.Value(int(rowIndex)) == key.builder.Value(int(entryIndex))
start := key.offsetsState[entryIndex]
end := key.length
if int(entryIndex) < key.count-1 {
end = int(key.offsetsState[entryIndex+1])
}
return typedArr.Value(int(rowIndex)) == key.stringState[start:end]
}
}

func (key *KeyString) GetBatch(length int, offset int) arrow.Array {
if key.finishedArray == nil {
key.finishedArray = key.builder.NewStringArray()
// We reserved an extra spot when resizing, so we can just add the final offset.
key.offsetsState[key.count] = int32(key.length)
key.finishedArray = array.NewStringData(array.NewData(arrow.BinaryTypes.String, key.count, []*memory.Buffer{nil, key.offsetsData, key.stringData}, nil, 0 /*TODO: Fixme*/, 0))
}
// TODO: Easier - return the whole array, then use array.NewSlice to slice it into batches.

return array.NewSlice(key.finishedArray, int64(offset), int64(offset+length))
}

0 comments on commit 5372273

Please sign in to comment.