Skip to content

Commit

Permalink
GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#…
Browse files Browse the repository at this point in the history
…14793)

* Closes: #14780

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade committed Dec 1, 2022
1 parent 1c8853b commit 02bbcc5
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go/arrow/array/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (a *List) Release() {

func (a *List) ValueOffsets(i int) (start, end int64) {
debug.Assert(i >= 0 && i < a.array.data.length, "index out of range")
start, end = int64(a.offsets[i]), int64(a.offsets[i+1])
start, end = int64(a.offsets[i+a.data.offset]), int64(a.offsets[i+a.data.offset+1])
return
}

Expand Down
107 changes: 107 additions & 0 deletions go/arrow/ipc/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package ipc_test
import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -482,3 +484,108 @@ func encodeDecodeIpcStream(t *testing.T,
}
return json, ipcReader, nil
}

func Example_mapSlice() {
mem := memory.DefaultAllocator
dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String)
schema := arrow.NewSchema([]arrow.Field{{
Name: "map",
Type: dt,
}}, nil)

arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
[{"key": "index1", "value": "main2"}],
[{"key": "index3", "value": "main4"}, {"key": "tag_int", "value": ""}],
[{"key":"index5","value":"main6"},{"key":"tag_int","value":""}],
[{"key":"index6","value":"main7"},{"key":"tag_int","value":""}],
[{"key":"index7","value":"main8"},{"key":"tag_int","value":""}],
[{"key":"index8","value":"main9"}]
]`))
if err != nil {
panic(err)
}
defer arr.Release()

rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
defer rec.Release()
rec2 := rec.NewSlice(1, 2)
defer rec2.Release()

var buf bytes.Buffer
w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
if err := w.Write(rec2); err != nil {
panic(err)
}
if err := w.Close(); err != nil {
panic(err)
}

r, err := ipc.NewReader(&buf)
if err != nil {
panic(err)
}
defer r.Release()

r.Next()
fmt.Println(r.Record())

// Output:
// record:
// schema:
// fields: 1
// - map: type=map<utf8, utf8>
// rows: 1
// col[0][map]: [{["index3" "tag_int"] ["main4" ""]}]
}

func Example_listSlice() {
mem := memory.DefaultAllocator
dt := arrow.ListOf(arrow.BinaryTypes.String)
schema := arrow.NewSchema([]arrow.Field{{
Name: "list",
Type: dt,
}}, nil)

arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
["index1"],
["index3", "tag_int"], ["index5", "tag_int"],
["index6", "tag_int"], ["index7", "tag_int"],
["index7", "tag_int"],
["index8"]
]`))
if err != nil {
panic(err)
}
defer arr.Release()

rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
defer rec.Release()
rec2 := rec.NewSlice(1, 2)
defer rec2.Release()

var buf bytes.Buffer
w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
if err := w.Write(rec2); err != nil {
panic(err)
}
if err := w.Close(); err != nil {
panic(err)
}

r, err := ipc.NewReader(&buf)
if err != nil {
panic(err)
}
defer r.Release()

r.Next()
fmt.Println(r.Record())

// Output:
// record:
// schema:
// fields: 1
// - list: type=list<item: utf8, nullable>
// rows: 1
// col[0][list]: [["index3" "tag_int"]]
}
19 changes: 9 additions & 10 deletions go/arrow/ipc/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
values = arr.ListValues()
mustRelease = false
values_offset int64
values_length int64
values_end int64
)
defer func() {
if mustRelease {
Expand All @@ -671,13 +671,13 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
}()

if voffsets != nil {
values_offset = int64(arr.Offsets()[0])
values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
values_offset, _ = arr.ValueOffsets(0)
_, values_end = arr.ValueOffsets(arr.Len() - 1)
}

if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
if arr.Len() != 0 || values_end < int64(values.Len()) {
// must also slice the values
values = array.NewSlice(values, values_offset, values_length)
values = array.NewSlice(values, values_offset, values_end)
mustRelease = true
}
err = w.visit(p, values)
Expand All @@ -699,7 +699,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
values = arr.ListValues()
mustRelease = false
values_offset int64
values_length int64
values_end int64
)
defer func() {
if mustRelease {
Expand All @@ -709,13 +709,12 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {

if arr.Len() > 0 && voffsets != nil {
values_offset, _ = arr.ValueOffsets(0)
_, values_length = arr.ValueOffsets(arr.Len() - 1)
values_length -= values_offset
_, values_end = arr.ValueOffsets(arr.Len() - 1)
}

if arr.Len() != 0 || values_length < int64(values.Len()) {
if arr.Len() != 0 || values_end < int64(values.Len()) {
// must also slice the values
values = array.NewSlice(values, values_offset, values_length)
values = array.NewSlice(values, values_offset, values_end)
mustRelease = true
}
err = w.visit(p, values)
Expand Down

0 comments on commit 02bbcc5

Please sign in to comment.