Skip to content

Commit

Permalink
apacheGH-35344: [Go][Format] Implementation of the LIST_VIEW and LARG…
Browse files Browse the repository at this point in the history
…E_LIST_VIEW array formats (apache#37468)

### Rationale for this change

Go implementation of apache#35345.

### What changes are included in this PR?

- [x] Add `LIST_VIEW` and `LARGE_LIST_VIEW` to datatype.go
- [x] Add `ListView` and `LargeListView` to list.go
- [x] Add `ListViewType` and `LargeListViewType` to datatype_nested.go
- [x] Add list-view builders
- [x] Implement list-view comparison in compare.go
- [x] String conversion in both directions
- [x] Validation of list-view arrays
- [x] Generation of random list-view arrays
- [x] Concatenation of list-view arrays in concat.go
- [x] JSON serialization/deserialization
- [x] Add data used for tests in `arrdata.go`
- [x] Add Flatbuffer changes
- [x] Add IPC support

### Are these changes tested?

Yes. Existing tests are being changed to also cover list-view variations as well as new tests focused solely on the list-view format.

### Are there any user-facing changes?

New structs and functions introduced.
* Closes: apache#35344

Authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
felipecrv authored and dgreiss committed Feb 17, 2024
1 parent ec104ef commit 51cafde
Show file tree
Hide file tree
Showing 19 changed files with 2,809 additions and 127 deletions.
2 changes: 2 additions & 0 deletions go/arrow/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func init() {
arrow.LARGE_LIST: func(data arrow.ArrayData) arrow.Array { return NewLargeListData(data) },
arrow.INTERVAL_MONTH_DAY_NANO: func(data arrow.ArrayData) arrow.Array { return NewMonthDayNanoIntervalData(data) },
arrow.RUN_END_ENCODED: func(data arrow.ArrayData) arrow.Array { return NewRunEndEncodedData(data) },
arrow.LIST_VIEW: func(data arrow.ArrayData) arrow.Array { return NewListViewData(data) },
arrow.LARGE_LIST_VIEW: func(data arrow.ArrayData) arrow.Array { return NewLargeListViewData(data) },

// invalid data types to fill out array to size 2^6 - 1
63: invalidDataType,
Expand Down
6 changes: 6 additions & 0 deletions go/arrow/array/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder {
case arrow.MAP:
typ := dtype.(*arrow.MapType)
return NewMapBuilderWithType(mem, typ)
case arrow.LIST_VIEW:
typ := dtype.(*arrow.ListViewType)
return NewListViewBuilderWithField(mem, typ.ElemField())
case arrow.LARGE_LIST_VIEW:
typ := dtype.(*arrow.LargeListViewType)
return NewLargeListViewBuilderWithField(mem, typ.ElemField())
case arrow.EXTENSION:
typ := dtype.(arrow.ExtensionType)
bldr := NewExtensionBuilder(mem, typ)
Expand Down
50 changes: 50 additions & 0 deletions go/arrow/array/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ func Equal(left, right arrow.Array) bool {
case *LargeList:
r := right.(*LargeList)
return arrayEqualLargeList(l, r)
case *ListView:
r := right.(*ListView)
return arrayEqualListView(l, r)
case *LargeListView:
r := right.(*LargeListView)
return arrayEqualLargeListView(l, r)
case *FixedSizeList:
r := right.(*FixedSizeList)
return arrayEqualFixedSizeList(l, r)
Expand Down Expand Up @@ -536,6 +542,12 @@ func arrayApproxEqual(left, right arrow.Array, opt equalOption) bool {
case *LargeList:
r := right.(*LargeList)
return arrayApproxEqualLargeList(l, r, opt)
case *ListView:
r := right.(*ListView)
return arrayApproxEqualListView(l, r, opt)
case *LargeListView:
r := right.(*LargeListView)
return arrayApproxEqualLargeListView(l, r, opt)
case *FixedSizeList:
r := right.(*FixedSizeList)
return arrayApproxEqualFixedSizeList(l, r, opt)
Expand Down Expand Up @@ -682,6 +694,44 @@ func arrayApproxEqualLargeList(left, right *LargeList, opt equalOption) bool {
return true
}

func arrayApproxEqualListView(left, right *ListView, opt equalOption) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
o := func() bool {
l := left.newListValue(i)
defer l.Release()
r := right.newListValue(i)
defer r.Release()
return arrayApproxEqual(l, r, opt)
}()
if !o {
return false
}
}
return true
}

func arrayApproxEqualLargeListView(left, right *LargeListView, opt equalOption) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
o := func() bool {
l := left.newListValue(i)
defer l.Release()
r := right.newListValue(i)
defer r.Release()
return arrayApproxEqual(l, r, opt)
}()
if !o {
return false
}
}
return true
}

func arrayApproxEqualFixedSizeList(left, right *FixedSizeList, opt equalOption) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
Expand Down
171 changes: 171 additions & 0 deletions go/arrow/array/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"math/bits"
"unsafe"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/bitutil"
Expand Down Expand Up @@ -355,6 +356,164 @@ func concatOffsets(buffers []*memory.Buffer, byteWidth int, mem memory.Allocator
}
}

func sumArraySizes(data []arrow.ArrayData) int {
outSize := 0
for _, arr := range data {
outSize += arr.Len()
}
return outSize
}

func getListViewBufferValues[T int32 | int64](data arrow.ArrayData, i int) []T {
bytes := data.Buffers()[i].Bytes()
base := (*T)(unsafe.Pointer(&bytes[0]))
ret := unsafe.Slice(base, data.Offset()+data.Len())
return ret[data.Offset():]
}

func putListViewOffsets32(in arrow.ArrayData, displacement int32, out *memory.Buffer, outOff int) {
debug.Assert(in.DataType().ID() == arrow.LIST_VIEW, "putListViewOffsets32: expected LIST_VIEW data")
inOff, inLen := in.Offset(), in.Len()
if inLen == 0 {
return
}
bitmap := in.Buffers()[0]
srcOffsets := getListViewBufferValues[int32](in, 1)
srcSizes := getListViewBufferValues[int32](in, 2)
isValidAndNonEmpty := func(i int) bool {
return (bitmap == nil || bitutil.BitIsSet(bitmap.Bytes(), inOff+i)) && srcSizes[i] > 0
}

dstOffsets := arrow.Int32Traits.CastFromBytes(out.Bytes())
for i, offset := range srcOffsets {
if isValidAndNonEmpty(i) {
// This is guaranteed by RangeOfValuesUsed returning the smallest offset
// of valid and non-empty list-views.
debug.Assert(offset+displacement >= 0, "putListViewOffsets32: offset underflow while concatenating arrays")
dstOffsets[outOff+i] = offset + displacement
} else {
dstOffsets[outOff+i] = 0
}
}
}

func putListViewOffsets64(in arrow.ArrayData, displacement int64, out *memory.Buffer, outOff int) {
debug.Assert(in.DataType().ID() == arrow.LARGE_LIST_VIEW, "putListViewOffsets64: expected LARGE_LIST_VIEW data")
inOff, inLen := in.Offset(), in.Len()
if inLen == 0 {
return
}
bitmap := in.Buffers()[0]
srcOffsets := getListViewBufferValues[int64](in, 1)
srcSizes := getListViewBufferValues[int64](in, 2)
isValidAndNonEmpty := func(i int) bool {
return (bitmap == nil || bitutil.BitIsSet(bitmap.Bytes(), inOff+i)) && srcSizes[i] > 0
}

dstOffsets := arrow.Int64Traits.CastFromBytes(out.Bytes())
for i, offset := range srcOffsets {
if isValidAndNonEmpty(i) {
// This is guaranteed by RangeOfValuesUsed returning the smallest offset
// of valid and non-empty list-views.
debug.Assert(offset+displacement >= 0, "putListViewOffsets64: offset underflow while concatenating arrays")
dstOffsets[outOff+i] = offset + displacement
} else {
dstOffsets[outOff+i] = 0
}
}
}

// Concatenate buffers holding list-view offsets into a single buffer of offsets
//
// valueRanges contains the relevant ranges of values in the child array actually
// referenced to by the views. Most commonly, these ranges will start from 0,
// but when that is not the case, we need to adjust the displacement of offsets.
// The concatenated child array does not contain values from the beginning
// if they are not referenced to by any view.
func concatListViewOffsets(data []arrow.ArrayData, byteWidth int, valueRanges []rng, mem memory.Allocator) (*memory.Buffer, error) {
outSize := sumArraySizes(data)
if byteWidth == 4 && outSize > math.MaxInt32 {
return nil, fmt.Errorf("%w: offset overflow while concatenating arrays", arrow.ErrInvalid)
}
out := memory.NewResizableBuffer(mem)
out.Resize(byteWidth * outSize)

numChildValues, elementsLength := 0, 0
for i, arr := range data {
displacement := numChildValues - valueRanges[i].offset
if byteWidth == 4 {
putListViewOffsets32(arr, int32(displacement), out, elementsLength)
} else {
putListViewOffsets64(arr, int64(displacement), out, elementsLength)
}
elementsLength += arr.Len()
numChildValues += valueRanges[i].len
}
debug.Assert(elementsLength == outSize, "implementation error")

return out, nil
}

func zeroNullListViewSizes[T int32 | int64](data arrow.ArrayData) {
if data.Len() == 0 || data.Buffers()[0] == nil {
return
}
validity := data.Buffers()[0].Bytes()
sizes := getListViewBufferValues[T](data, 2)

for i := 0; i < data.Len(); i++ {
if !bitutil.BitIsSet(validity, data.Offset()+i) {
sizes[i] = 0
}
}
}

func concatListView(data []arrow.ArrayData, offsetType arrow.FixedWidthDataType, out *Data, mem memory.Allocator) (err error) {
// Calculate the ranges of values that each list-view array uses
valueRanges := make([]rng, len(data))
for i, input := range data {
offset, len := rangeOfValuesUsed(input)
valueRanges[i].offset = offset
valueRanges[i].len = len
}

// Gather the children ranges of each input array
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}

// Concatenate the values
values, err := concat(childData, mem)
if err != nil {
return err
}

// Concatenate the offsets
offsetBuffer, err := concatListViewOffsets(data, offsetType.Bytes(), valueRanges, mem)
if err != nil {
return err
}

// Concatenate the sizes
sizeBuffers := gatherBuffersFixedWidthType(data, 2, offsetType)
sizeBuffer := concatBuffers(sizeBuffers, mem)

out.childData = []arrow.ArrayData{values}
out.buffers[1] = offsetBuffer
out.buffers[2] = sizeBuffer

// To make sure the sizes don't reference values that are not in the new
// concatenated values array, we zero the sizes of null list-view values.
if offsetType.ID() == arrow.INT32 {
zeroNullListViewSizes[int32](out)
} else {
zeroNullListViewSizes[int64](out)
}

return nil
}

// concat is the implementation for actually performing the concatenation of the arrow.ArrayData
// objects that we can call internally for nested types.
func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, err error) {
Expand Down Expand Up @@ -483,6 +642,18 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData,
if err != nil {
return nil, err
}
case *arrow.ListViewType:
offsetType := arrow.PrimitiveTypes.Int32.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListViewType:
offsetType := arrow.PrimitiveTypes.Int64.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.FixedSizeListType:
childData := gatherChildrenMultiplier(data, 0, int(dt.Len()))
for _, c := range childData {
Expand Down
21 changes: 21 additions & 0 deletions go/arrow/array/concat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func TestConcatenate(t *testing.T) {
{arrow.BinaryTypes.LargeString},
{arrow.ListOf(arrow.PrimitiveTypes.Int8)},
{arrow.LargeListOf(arrow.PrimitiveTypes.Int8)},
{arrow.ListViewOf(arrow.PrimitiveTypes.Int8)},
{arrow.LargeListViewOf(arrow.PrimitiveTypes.Int8)},
{arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int8)},
{arrow.StructOf()},
{arrow.MapOf(arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Int8)},
Expand Down Expand Up @@ -200,6 +202,16 @@ func (cts *ConcatTestSuite) generateArr(size int64, nullprob float64) arrow.Arra
}
}
return bldr.NewArray()
case arrow.LIST_VIEW:
arr := cts.rng.ListView(cts.dt.(arrow.VarLenListLikeType), size, 0, 20, nullprob)
err := arr.ValidateFull()
cts.NoError(err)
return arr
case arrow.LARGE_LIST_VIEW:
arr := cts.rng.LargeListView(cts.dt.(arrow.VarLenListLikeType), size, 0, 20, nullprob)
err := arr.ValidateFull()
cts.NoError(err)
return arr
case arrow.FIXED_SIZE_LIST:
const listsize = 3
valuesSize := size * listsize
Expand Down Expand Up @@ -317,11 +329,20 @@ func (cts *ConcatTestSuite) TestCheckConcat() {

slices := cts.slices(arr, offsets)
for _, s := range slices {
if s.DataType().ID() == arrow.LIST_VIEW {
err := s.(*array.ListView).ValidateFull()
cts.NoError(err)
}
defer s.Release()
}

actual, err := array.Concatenate(slices, cts.mem)
cts.NoError(err)
if arr.DataType().ID() == arrow.LIST_VIEW {
lv := actual.(*array.ListView)
err := lv.ValidateFull()
cts.NoError(err)
}
defer actual.Release()

cts.Truef(array.Equal(expected, actual), "expected: %s\ngot: %s\n", expected, actual)
Expand Down

0 comments on commit 51cafde

Please sign in to comment.