Skip to content

Commit

Permalink
GH-34171: [Go][Compute] Implement "Unique" kernel (#34172)
Browse files Browse the repository at this point in the history
### Rationale for this change

Implementing a kernel for computing the "unique" values in an arrow array, primarily for use in solving #33466. 

### What changes are included in this PR?
Adds a "unique" function to the compute list and helper convenience functions.

### Are these changes tested?
Yes, unit tests are included.

### Are there any user-facing changes?
Just the new available functions.

* Closes: #34171

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade committed Feb 14, 2023
1 parent 266f166 commit 90071cc
Show file tree
Hide file tree
Showing 11 changed files with 1,227 additions and 31 deletions.
6 changes: 3 additions & 3 deletions go/arrow/array/dictionary.go
Expand Up @@ -771,7 +771,7 @@ func (b *dictionaryBuilder) newWithDictOffset(offset int) (indices, dict *Data,
indices.Retain()

b.deltaOffset = b.memoTable.Size()
dict, err = getDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
dict, err = GetDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
b.reset()
return
}
Expand Down Expand Up @@ -1471,7 +1471,7 @@ func (u *unifier) GetResult() (outType arrow.DataType, outDict arrow.Array, err
}
outType = &arrow.DictionaryType{IndexType: indexType, ValueType: u.valueType}

dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func (u *unifier) GetResultWithIndexType(indexType arrow.DataType) (arrow.Array,
return nil, errors.New("arrow/array: cannot combine dictionaries. unified dictionary requires a larger index type")
}

dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
if err != nil {
return nil, err
}
Expand Down
13 changes: 12 additions & 1 deletion go/arrow/array/util.go
Expand Up @@ -250,7 +250,7 @@ func TableFromJSON(mem memory.Allocator, sc *arrow.Schema, recJSON []string, opt
return NewTableFromRecords(sc, batches), nil
}

func getDictArrayData(mem memory.Allocator, valueType arrow.DataType, memoTable hashing.MemoTable, startOffset int) (*Data, error) {
func GetDictArrayData(mem memory.Allocator, valueType arrow.DataType, memoTable hashing.MemoTable, startOffset int) (*Data, error) {
dictLen := memoTable.Size() - startOffset
buffers := []*memory.Buffer{nil, nil}

Expand All @@ -272,6 +272,17 @@ func getDictArrayData(mem memory.Allocator, valueType arrow.DataType, memoTable
offsets := arrow.Int32Traits.CastFromBytes(buffers[1].Bytes())
tbl.CopyOffsetsSubset(startOffset, offsets)

valuesz := offsets[len(offsets)-1] - offsets[0]
buffers[2].Resize(int(valuesz))
tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
case arrow.LARGE_BINARY, arrow.LARGE_STRING:
buffers = append(buffers, memory.NewResizableBuffer(mem))
defer buffers[2].Release()

buffers[1].Resize(arrow.Int64Traits.BytesRequired(dictLen + 1))
offsets := arrow.Int64Traits.CastFromBytes(buffers[1].Bytes())
tbl.CopyLargeOffsetsSubset(startOffset, offsets)

valuesz := offsets[len(offsets)-1] - offsets[0]
buffers[2].Resize(int(valuesz))
tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
Expand Down
12 changes: 11 additions & 1 deletion go/arrow/compute/executor.go
Expand Up @@ -962,10 +962,20 @@ func (v *vectorExecutor) Execute(ctx context.Context, batch *ExecBatch, data cha
func (v *vectorExecutor) WrapResults(ctx context.Context, out <-chan Datum, hasChunked bool) Datum {
// if kernel doesn't output chunked, just grab the one output and return it
if !v.kernel.(*exec.VectorKernel).OutputChunked {
var output Datum
select {
case <-ctx.Done():
return nil
case output := <-out:
case output = <-out:
}

// we got an output datum, but let's wait for the channel to
// close so we don't have any race conditions
select {
case <-ctx.Done():
output.Release()
return nil
case <-out:
return output
}
}
Expand Down
42 changes: 22 additions & 20 deletions go/arrow/compute/internal/exec/span.go
Expand Up @@ -437,6 +437,12 @@ func (a *ArraySpan) FillFromScalar(val scalar.Scalar) {
}
}

func (a *ArraySpan) SetDictionary(span *ArraySpan) {
a.resizeChildren(1)
a.Children[0].Release()
a.Children[0] = *span
}

// TakeOwnership is like SetMembers only this takes ownership of
// the buffers by calling Retain on them so that the passed in
// ArrayData can be released without negatively affecting this
Expand Down Expand Up @@ -479,18 +485,13 @@ func (a *ArraySpan) TakeOwnership(data arrow.ArrayData) {
}

if typeID == arrow.DICTIONARY {
if cap(a.Children) >= 1 {
a.Children = a.Children[:1]
} else {
a.Children = make([]ArraySpan, 1)
a.resizeChildren(1)
dict := data.Dictionary()
if dict != (*array.Data)(nil) {
a.Children[0].TakeOwnership(dict)
}
a.Children[0].TakeOwnership(data.Dictionary())
} else {
if cap(a.Children) >= len(data.Children()) {
a.Children = a.Children[:len(data.Children())]
} else {
a.Children = make([]ArraySpan, len(data.Children()))
}
a.resizeChildren(len(data.Children()))
for i, c := range data.Children() {
a.Children[i].TakeOwnership(c)
}
Expand Down Expand Up @@ -538,12 +539,11 @@ func (a *ArraySpan) SetMembers(data arrow.ArrayData) {
}

if typeID == arrow.DICTIONARY {
if cap(a.Children) >= 1 {
a.Children = a.Children[:1]
} else {
a.Children = make([]ArraySpan, 1)
a.resizeChildren(1)
dict := data.Dictionary()
if dict != (*array.Data)(nil) {
a.Children[0].SetMembers(dict)
}
a.Children[0].SetMembers(data.Dictionary())
} else {
if cap(a.Children) >= len(data.Children()) {
a.Children = a.Children[:len(data.Children())]
Expand Down Expand Up @@ -619,6 +619,12 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
span.Buffers[i].Buf, span.Buffers[i].Owner = nil, nil
}

if dt.ID() == arrow.DICTIONARY {
span.resizeChildren(1)
FillZeroLength(dt.(*arrow.DictionaryType).ValueType, &span.Children[0])
return
}

nt, ok := dt.(arrow.NestedType)
if !ok {
if len(span.Children) > 0 {
Expand All @@ -627,11 +633,7 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
return
}

if cap(span.Children) >= len(nt.Fields()) {
span.Children = span.Children[:len(nt.Fields())]
} else {
span.Children = make([]ArraySpan, len(nt.Fields()))
}
span.resizeChildren(len(nt.Fields()))
for i, f := range nt.Fields() {
FillZeroLength(f.Type, &span.Children[i])
}
Expand Down
8 changes: 7 additions & 1 deletion go/arrow/compute/internal/exec/utils.go
Expand Up @@ -102,6 +102,11 @@ func GetValues[T FixedWidthTypes](data arrow.ArrayData, i int) []T {
return ret[data.Offset():]
}

func GetOffsets[T int32 | int64](data arrow.ArrayData, i int) []T {
ret := unsafe.Slice((*T)(unsafe.Pointer(&data.Buffers()[i].Bytes()[0])), data.Offset()+data.Len()+1)
return ret[data.Offset():]
}

// GetSpanValues returns a properly typed slice by reinterpreting
// the buffer at index i using unsafe.Slice. This will take into account
// the offset of the given ArraySpan.
Expand Down Expand Up @@ -177,13 +182,14 @@ var typMap = map[reflect.Type]arrow.DataType{
reflect.TypeOf(arrow.Date64(0)): arrow.FixedWidthTypes.Date64,
reflect.TypeOf(true): arrow.FixedWidthTypes.Boolean,
reflect.TypeOf(float16.Num{}): arrow.FixedWidthTypes.Float16,
reflect.TypeOf([]byte{}): arrow.BinaryTypes.Binary,
}

// GetDataType returns the appropriate arrow.DataType for the given type T
// only for non-parametric types. This uses a map and reflection internally
// so don't call this in a tight loop, instead call this once and then use
// a closure with the result.
func GetDataType[T NumericTypes | bool | string | float16.Num]() arrow.DataType {
func GetDataType[T NumericTypes | bool | string | []byte | float16.Num]() arrow.DataType {
var z T
return typMap[reflect.TypeOf(z)]
}
Expand Down
3 changes: 3 additions & 0 deletions go/arrow/compute/internal/kernels/types.go
Expand Up @@ -52,6 +52,9 @@ var (
arrow.BinaryTypes.LargeBinary,
arrow.BinaryTypes.String,
arrow.BinaryTypes.LargeString}
primitiveTypes = append(append([]arrow.DataType{arrow.Null,
arrow.FixedWidthTypes.Date32, arrow.FixedWidthTypes.Date64},
numericTypes...), baseBinaryTypes...)
)

//go:generate stringer -type=CompareOperator -linecomment
Expand Down

0 comments on commit 90071cc

Please sign in to comment.