Skip to content

Commit

Permalink
GH-33466: [Go][Parquet] Add support for Dictionary arrays to pqarrow (#…
Browse files Browse the repository at this point in the history
…34342)

### Rationale for this change
The Parquet package should properly handle dictionary array types to allow consumers to efficiently read/write dictionary encoded arrays for Dictionary encoded parquet files.

### What changes are included in this PR?
Updates and fixes to allow Parquet read/write directly to/from dictionary arrays. Because it requires the `Unique` and `Take` compute functions, the dictionary handling requires go1.18+ just like the compute package does.

Updates the schema to handle dictionary types when storing the arrow schema. This also adds some new methods to the `ColumnWriter` interface and the `BinaryRecordReader` for handling Dictionaries. 

### Are these changes tested?
Yes, unit tests are added in the change.

* Closes: #33466

Lead-authored-by: Matt Topol <zotthewizard@gmail.com>
Co-authored-by: Will Jones <willjones127@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade and wjones127 committed Mar 3, 2023
1 parent 73e2b56 commit afe5514
Show file tree
Hide file tree
Showing 39 changed files with 16,141 additions and 10,217 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/js_build.sh
Expand Up @@ -27,7 +27,7 @@ build_dir=${2}

pushd ${source_dir}

yarn --frozen-lockfile
yarn --immutable
yarn lint:ci
yarn build

Expand Down
1 change: 0 additions & 1 deletion go/arrow/array/array_test.go
Expand Up @@ -111,7 +111,6 @@ func TestMakeFromData(t *testing.T) {
{name: "dense union", d: arrow.DenseUnionOf(nil, nil), child: []arrow.ArrayData{}, size: 3},

// various dictionary index types and value types
{name: "dictionary", d: &testDataType{arrow.DICTIONARY}, expPanic: true, expError: "arrow/array: no dictionary set in Data for Dictionary array"},
{name: "dictionary", d: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int8, ValueType: &testDataType{arrow.INT64}}, dict: array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */)},
{name: "dictionary", d: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType: &testDataType{arrow.INT32}}, dict: array.NewData(&testDataType{arrow.INT32}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */)},
{name: "dictionary", d: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int16, ValueType: &testDataType{arrow.UINT16}}, dict: array.NewData(&testDataType{arrow.UINT16}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */)},
Expand Down
3 changes: 3 additions & 0 deletions go/arrow/array/binarybuilder.go
Expand Up @@ -220,6 +220,9 @@ func (b *BinaryBuilder) ReserveData(n int) {
// additional memory will be allocated. If n is smaller, the allocated memory may be reduced.
func (b *BinaryBuilder) Resize(n int) {
b.offsets.resize((n + 1) * b.offsetByteWidth)
if (n * b.offsetByteWidth) < b.offsets.Len() {
b.offsets.SetLength(n * b.offsetByteWidth)
}
b.builder.resize(n, b.init)
}

Expand Down
10 changes: 10 additions & 0 deletions go/arrow/array/bufferbuilder.go
Expand Up @@ -32,6 +32,7 @@ type bufBuilder interface {
Bytes() []byte
resize(int)
Advance(int)
SetLength(int)
Append([]byte)
Reset()
Finish() *memory.Buffer
Expand Down Expand Up @@ -96,6 +97,15 @@ func (b *bufferBuilder) resize(elements int) {
}
}

func (b *bufferBuilder) SetLength(length int) {
if length > b.length {
b.Advance(length)
return
}

b.length = length
}

// Advance increases the buffer by length and initializes the skipped bytes to zero.
func (b *bufferBuilder) Advance(length int) {
if b.capacity < b.length+length {
Expand Down
65 changes: 61 additions & 4 deletions go/arrow/array/dictionary.go
Expand Up @@ -213,13 +213,15 @@ func (d *Dictionary) Release() {
func (d *Dictionary) setData(data *Data) {
d.array.setData(data)

dictType := data.dtype.(*arrow.DictionaryType)
if data.dictionary == nil {
panic("arrow/array: no dictionary set in Data for Dictionary array")
if data.length > 0 {
panic("arrow/array: no dictionary set in Data for Dictionary array")
}
} else {
debug.Assert(arrow.TypeEqual(dictType.ValueType, data.dictionary.DataType()), "mismatched dictionary value types")
}

dictType := data.dtype.(*arrow.DictionaryType)
debug.Assert(arrow.TypeEqual(dictType.ValueType, data.dictionary.DataType()), "mismatched dictionary value types")

indexData := NewData(dictType.IndexType, data.length, data.buffers, data.childData, data.nulls, data.offset)
defer indexData.Release()
d.indices = MakeFromData(indexData)
Expand Down Expand Up @@ -400,6 +402,7 @@ type DictionaryBuilder interface {
NewDictionaryArray() *Dictionary
NewDelta() (indices, delta arrow.Array, err error)
AppendArray(arrow.Array) error
AppendIndices([]int, []bool)
ResetFull()
}

Expand Down Expand Up @@ -883,6 +886,60 @@ func (b *dictionaryBuilder) AppendArray(arr arrow.Array) error {
return nil
}

func (b *dictionaryBuilder) AppendIndices(indices []int, valid []bool) {
b.length += len(indices)
switch idxbldr := b.idxBuilder.Builder.(type) {
case *Int8Builder:
vals := make([]int8, len(indices))
for i, v := range indices {
vals[i] = int8(v)
}
idxbldr.AppendValues(vals, valid)
case *Int16Builder:
vals := make([]int16, len(indices))
for i, v := range indices {
vals[i] = int16(v)
}
idxbldr.AppendValues(vals, valid)
case *Int32Builder:
vals := make([]int32, len(indices))
for i, v := range indices {
vals[i] = int32(v)
}
idxbldr.AppendValues(vals, valid)
case *Int64Builder:
vals := make([]int64, len(indices))
for i, v := range indices {
vals[i] = int64(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint8Builder:
vals := make([]uint8, len(indices))
for i, v := range indices {
vals[i] = uint8(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint16Builder:
vals := make([]uint16, len(indices))
for i, v := range indices {
vals[i] = uint16(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint32Builder:
vals := make([]uint32, len(indices))
for i, v := range indices {
vals[i] = uint32(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint64Builder:
vals := make([]uint64, len(indices))
for i, v := range indices {
vals[i] = uint64(v)
}
idxbldr.AppendValues(vals, valid)
}
}

type NullDictionaryBuilder struct {
dictionaryBuilder
}
Expand Down
45 changes: 45 additions & 0 deletions go/arrow/array/dictionary_test.go
Expand Up @@ -1638,3 +1638,48 @@ func TestDictioanryUnifierTableZeroColumns(t *testing.T) {
assert.EqualValues(t, 42, unified.NumRows())
assert.True(t, array.TableEqual(table, unified))
}

func TestDictionaryAppendIndices(t *testing.T) {
indexTypes := []arrow.DataType{
arrow.PrimitiveTypes.Int8,
arrow.PrimitiveTypes.Uint8,
arrow.PrimitiveTypes.Int16,
arrow.PrimitiveTypes.Uint16,
arrow.PrimitiveTypes.Int32,
arrow.PrimitiveTypes.Uint32,
arrow.PrimitiveTypes.Int64,
arrow.PrimitiveTypes.Uint64,
}

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

dict, _, err := array.FromJSON(mem, arrow.BinaryTypes.String, strings.NewReader(`["a", "b", "c", "d", "e", "f"]`))
require.NoError(t, err)
defer dict.Release()

indices := []int{3, 4, 0, 3, 1, 4, 4, 5}

for _, typ := range indexTypes {
t.Run(typ.String(), func(t *testing.T) {
scoped := memory.NewCheckedAllocatorScope(mem)
defer scoped.CheckSize(t)

dictType := &arrow.DictionaryType{
IndexType: typ, ValueType: dict.DataType()}
bldr := array.NewDictionaryBuilderWithDict(mem, dictType, dict)
defer bldr.Release()

bldr.AppendIndices(indices, nil)

arr := bldr.NewDictionaryArray()
defer arr.Release()

arrIndices := arr.Indices()
assert.EqualValues(t, len(indices), arr.Len())
assert.EqualValues(t, len(indices), arrIndices.Len())

assert.Equal(t, fmt.Sprint(indices), arrIndices.String())
})
}
}
43 changes: 43 additions & 0 deletions go/arrow/array/table.go
Expand Up @@ -128,6 +128,49 @@ func NewTable(schema *arrow.Schema, cols []arrow.Column, rows int64) *simpleTabl
return &tbl
}

// NewTableFromSlice is a convenience function to create a table from a slice
// of slices of arrow.Array.
//
// Like other NewTable functions this can panic if:
// - len(schema.Fields) != len(data)
// - the total length of each column's array slice (ie: number of rows
// in the column) aren't the same for all columns.
func NewTableFromSlice(schema *arrow.Schema, data [][]arrow.Array) *simpleTable {
if len(data) != len(schema.Fields()) {
panic("array/table: mismatch in number of columns and data for creating a table")
}

cols := make([]arrow.Column, len(schema.Fields()))
for i, arrs := range data {
field := schema.Field(i)
chunked := arrow.NewChunked(field.Type, arrs)
cols[i] = *arrow.NewColumn(field, chunked)
chunked.Release()
}

tbl := simpleTable{
refCount: 1,
schema: schema,
cols: cols,
rows: int64(cols[0].Len()),
}

defer func() {
if r := recover(); r != nil {
// if validate panics, let's release the columns
// so that we don't leak them, then propagate the panic
for _, c := range cols {
c.Release()
}
panic(r)
}
}()
// validate the table and its constituents.
tbl.validate()

return &tbl
}

// NewTableFromRecords returns a new basic, non-lazy in-memory table.
//
// NewTableFromRecords panics if the records and schema are inconsistent.
Expand Down
15 changes: 15 additions & 0 deletions go/arrow/array/table_test.go
Expand Up @@ -456,9 +456,14 @@ func TestTable(t *testing.T) {

cols := []arrow.Column{*col1, *col2}

slices := [][]arrow.Array{col1.Data().Chunks(), col2.Data().Chunks()}

tbl := array.NewTable(schema, cols, -1)
defer tbl.Release()

tbl2 := array.NewTableFromSlice(schema, slices)
defer tbl2.Release()

tbl.Retain()
tbl.Release()

Expand All @@ -476,6 +481,16 @@ func TestTable(t *testing.T) {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
}

if got, want := tbl2.NumRows(), int64(10); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
}
if got, want := tbl2.NumCols(), int64(2); got != want {
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
}
if got, want := tbl2.Column(0).Name(), col1.Name(); got != want {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
}

for _, tc := range []struct {
schema *arrow.Schema
cols []arrow.Column
Expand Down
7 changes: 6 additions & 1 deletion go/arrow/compute/internal/kernels/vector_hash.go
Expand Up @@ -459,12 +459,17 @@ func hashExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) e
return impl.Flush(out)
}

func uniqueFinalize(ctx *exec.KernelCtx, _ []*exec.ArraySpan) ([]*exec.ArraySpan, error) {
func uniqueFinalize(ctx *exec.KernelCtx, results []*exec.ArraySpan) ([]*exec.ArraySpan, error) {
impl, ok := ctx.State.(HashState)
if !ok {
return nil, fmt.Errorf("%w: HashState in invalid state", arrow.ErrInvalid)
}

for _, r := range results {
// release any pre-allocation we did
r.Release()
}

uniques, err := impl.GetDictionary()
if err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions go/internal/bitutils/bit_set_run_reader.go
Expand Up @@ -344,3 +344,18 @@ func VisitSetBitRuns(bitmap []byte, bitmapOffset int64, length int64, visitFn Vi
}
return nil
}

func VisitSetBitRunsNoErr(bitmap []byte, bitmapOffset int64, length int64, visitFn func(pos, length int64)) {
if bitmap == nil {
visitFn(0, length)
return
}
rdr := NewSetBitRunReader(bitmap, bitmapOffset, length)
for {
run := rdr.NextRun()
if run.Length == 0 {
break
}
visitFn(run.Pos, run.Length)
}
}
3 changes: 3 additions & 0 deletions go/parquet/file/column_reader.go
Expand Up @@ -134,6 +134,8 @@ type columnChunkReader struct {
// is set when an error is encountered
err error
defLvlBuffer []int16

newDictionary bool
}

// NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
Expand Down Expand Up @@ -225,6 +227,7 @@ func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
return xerrors.New("parquet: dictionary index must be plain encoding")
}

c.newDictionary = true
c.curDecoder = c.decoders[enc]
return nil
}
Expand Down

0 comments on commit afe5514

Please sign in to comment.