Skip to content

Commit

Permalink
Continued.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 10, 2023
1 parent ba23a7e commit d8e318c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
7 changes: 7 additions & 0 deletions arrowexec/helpers/equality_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@ func MakeKeyEqualityChecker(leftRecord, rightRecord execution.Record, leftKeyInd
columnEqualityCheckers[i] = func(leftRowIndex, rightRowIndex int) bool {
return leftTypedArr[leftRowIndex] == rightTypedArr[rightRowIndex]
}
case arrow.FLOAT64:
leftTypedArr := leftRecord.Column(leftKeyIndices[i]).(*array.Float64).Float64Values()
rightTypedArr := rightRecord.Column(rightKeyIndices[i]).(*array.Float64).Float64Values()
columnEqualityCheckers[i] = func(leftRowIndex, rightRowIndex int) bool {
return leftTypedArr[leftRowIndex] == rightTypedArr[rightRowIndex]
}
case arrow.STRING:
// TODO: Move to large string array.
leftTypedArr := leftRecord.Column(leftKeyIndices[i]).(*array.String)
rightTypedArr := rightRecord.Column(rightKeyIndices[i]).(*array.String)
columnEqualityCheckers[i] = func(leftRowIndex, rightRowIndex int) bool {
return leftTypedArr.Value(leftRowIndex) == rightTypedArr.Value(rightRowIndex)
}
// TODO: Handle all types.
default:
panic("unsupported type for equality checker")
}
Expand Down
7 changes: 7 additions & 0 deletions arrowexec/helpers/key_hasher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helpers

import (
"math"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cube2222/octosql/arrowexec/execution"
Expand All @@ -24,6 +26,11 @@ func MakeRowHasher(columns []arrow.Array) func(rowIndex uint) uint64 {
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddUint64(hash, uint64(typedArr[rowIndex]))
}
case arrow.FLOAT64:
typedArr := columns[i].(*array.Float64).Float64Values()
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddUint64(hash, math.Float64bits(typedArr[rowIndex]))
}
case arrow.STRING:
typedArr := columns[i].(*array.String)
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
Expand Down
2 changes: 1 addition & 1 deletion arrowexec/json/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var parserWorkReceiveChannel = func() chan<- jobIn {
sequenceNumber: job.sequenceNumber,
}
if err == nil {
outJob.record = recordBuilder.NewRecord()
outJob.record = recordBuilder.NewRecord() // TODO: If all fields are optimized out, then this doesn't properly carry the count of rows. Put it into execution.Record.
} else {
outJob.err = err
}
Expand Down
33 changes: 33 additions & 0 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func MakeKey(dt arrow.DataType) Key {
return &KeyInt{
data: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
}
case arrow.FLOAT64:
return &KeyFloat{
data: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
}
case arrow.STRING:
return &KeyString{
stringData: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
Expand Down Expand Up @@ -222,6 +226,35 @@ func (key *KeyInt) GetBatch(length int, offset int) arrow.Array {
return array.NewInt64Data(array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, key.data}, nil, 0 /*TODO: Fixme*/, offset))
}

type KeyFloat struct {
data *memory.Buffer // TODO: Release these buffers at some point.
state []float64 // This uses the above data as the storage underneath.
count int
}

func (key *KeyFloat) MakeNewKeyAdder(arr arrow.Array) func(rowIndex uint) {
typedArr := arr.(*array.Float64).Float64Values()
return func(rowIndex uint) {
if key.count >= len(key.state) {
key.data.Resize(arrow.Float64Traits.BytesRequired(bitutil.NextPowerOf2(key.count + 1)))
key.state = arrow.Float64Traits.CastFromBytes(key.data.Bytes())
}
key.state[key.count] = typedArr[rowIndex]
key.count++
}
}

func (key *KeyFloat) MakeKeyEqualityChecker(arr arrow.Array) func(entryIndex uint, rowIndex uint) bool {
typedArr := arr.(*array.Float64).Float64Values()
return func(entryIndex uint, rowIndex uint) bool {
return typedArr[rowIndex] == key.state[entryIndex]
}
}

func (key *KeyFloat) GetBatch(length int, offset int) arrow.Array {
return array.NewFloat64Data(array.NewData(arrow.PrimitiveTypes.Float64, length, []*memory.Buffer{nil, key.data}, nil, 0 /*TODO: Fixme*/, offset))
}

type KeyString struct {
// TODO: Use a LargeStringArray, instead of a StringArray.
// We use a custom set of buffers, rather than using the builder, because accessing elements of the builder is slow.
Expand Down

0 comments on commit d8e318c

Please sign in to comment.