From 47314c3999d7b7a7f9167c6ed6793da756c411a1 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 18:48:55 -0400 Subject: [PATCH] ARROW-17646: [Go][CI] Switch C Data to use cgo.Handle (bumps to Go1.17) (#14067) Authored-by: Matt Topol Signed-off-by: Matt Topol --- .env | 2 +- .github/workflows/go.yml | 24 ++++---- ci/docker/debian-10-go.dockerfile | 2 +- ci/docker/debian-11-go.dockerfile | 2 +- ci/scripts/go_build.sh | 2 +- dev/release/verify-release-candidate.sh | 2 +- dev/tasks/tasks.yml | 4 +- go/arrow/cdata/cdata.go | 18 ++++-- go/arrow/cdata/cdata_exports.go | 27 ++++++++- go/arrow/cdata/cdata_test.go | 35 ++++++++++++ go/arrow/cdata/cdata_test_framework.go | 10 +++- go/arrow/cdata/exports.go | 76 +++++++++++++++---------- go/arrow/cdata/interface.go | 9 +++ go/go.mod | 26 ++++++++- 14 files changed, 180 insertions(+), 59 deletions(-) diff --git a/.env b/.env index 4aa04daab040e..8ae036b6b5be9 100644 --- a/.env +++ b/.env @@ -58,7 +58,7 @@ CUDA=11.0.3 DASK=latest DOTNET=6.0 GCC_VERSION="" -GO=1.16 +GO=1.17 STATICCHECK=v0.2.2 HDFS=3.2.1 JDK=8 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 4112bf3bd4c75..5fccebbca15e4 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -50,9 +50,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -86,9 +86,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -123,9 +123,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -158,9 +158,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -193,9 +193,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -228,9 +228,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest diff --git a/ci/docker/debian-10-go.dockerfile b/ci/docker/debian-10-go.dockerfile index dfe81f5a73ced..8d964c76a66fa 100644 --- a/ci/docker/debian-10-go.dockerfile +++ b/ci/docker/debian-10-go.dockerfile @@ -16,7 +16,7 @@ # under the License. ARG arch=amd64 -ARG go=1.16 +ARG go=1.17 ARG staticcheck=v0.2.2 FROM ${arch}/golang:${go}-buster diff --git a/ci/docker/debian-11-go.dockerfile b/ci/docker/debian-11-go.dockerfile index 32d7b3af39018..9f75bf23fddf2 100644 --- a/ci/docker/debian-11-go.dockerfile +++ b/ci/docker/debian-11-go.dockerfile @@ -16,7 +16,7 @@ # under the License. ARG arch=amd64 -ARG go=1.16 +ARG go=1.17 ARG staticcheck=v0.2.2 FROM ${arch}/golang:${go}-bullseye diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh index 43f348b153814..c113bbd320ef4 100755 --- a/ci/scripts/go_build.sh +++ b/ci/scripts/go_build.sh @@ -22,7 +22,7 @@ set -ex source_dir=${1}/go ARCH=`uname -m` -# Arm64 CI is triggered by travis and run in arm64v8/golang:1.16-bullseye +# Arm64 CI is triggered by travis and run in arm64v8/golang:1.17-bullseye if [ "aarch64" == "$ARCH" ]; then # Install `staticcheck` GO111MODULE=on go install honnef.co/go/tools/cmd/staticcheck@v0.2.2 diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index eb44e3e4fecfe..b016988ba91a7 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -399,7 +399,7 @@ install_go() { return 0 fi - local version=1.16.12 + local version=1.17.13 show_info "Installing go version ${version}..." local arch="$(uname -m)" diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 0816c24589e8b..ae3c613902bac 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -1449,13 +1449,13 @@ tasks: ci: github template: r/github.linux.revdepcheck.yml - test-debian-11-go-1.16: + test-debian-11-go-1.17: ci: azure template: docker-tests/azure.linux.yml params: env: DEBIAN: 11 - GO: 1.16 + GO: 1.17 image: debian-go test-ubuntu-default-docs: diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index a2b583f268ef2..aec166a0110c0 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -27,7 +27,11 @@ package cdata // int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); } // int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); } // const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); } -// struct ArrowArray* get_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); } +// struct ArrowArray* get_arr() { +// struct ArrowArray* out = (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); +// memset(out, 0, sizeof(struct ArrowArray)); +// return out; +// } // struct ArrowArrayStream* get_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); } // import "C" @@ -655,18 +659,22 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) { rdr.stream = C.get_stream() C.ArrowArrayStreamMove(stream, rdr.stream) + rdr.arr = C.get_arr() runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) { if r.cur != nil { r.cur.Release() } C.ArrowArrayStreamRelease(r.stream) + C.ArrowArrayRelease(r.arr) C.free(unsafe.Pointer(r.stream)) + C.free(unsafe.Pointer(r.arr)) }) } // Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface type nativeCRecordBatchReader struct { stream *CArrowArrayStream + arr *CArrowArray schema *arrow.Schema cur arrow.Record @@ -713,18 +721,16 @@ func (n *nativeCRecordBatchReader) next() error { n.cur = nil } - arr := C.get_arr() - defer C.free(unsafe.Pointer(arr)) - errno := C.stream_get_next(n.stream, arr) + errno := C.stream_get_next(n.stream, n.arr) if errno != 0 { return n.getError(int(errno)) } - if C.ArrowArrayIsReleased(arr) == 1 { + if C.ArrowArrayIsReleased(n.arr) == 1 { return io.EOF } - rec, err := ImportCRecordBatchWithSchema(arr, n.schema) + rec, err := ImportCRecordBatchWithSchema(n.arr, n.schema) if err != nil { return err } diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index a3da68447db22..b69d44d9b50fc 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -36,6 +36,7 @@ import ( "encoding/binary" "fmt" "reflect" + "runtime/cgo" "strings" "unsafe" @@ -362,7 +363,9 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0])) } - out.private_data = unsafe.Pointer(storeData(arr.Data())) + arr.Data().Retain() + h := cgo.NewHandle(arr.Data()) + out.private_data = unsafe.Pointer(&h) out.release = (*[0]byte)(C.goReleaseArray) switch arr := arr.(type) { case *array.List: @@ -400,3 +403,25 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.children = nil } } + +type cRecordReader struct { + rdr array.RecordReader +} + +func (rr cRecordReader) getSchema(out *CArrowSchema) int { + ExportArrowSchema(rr.rdr.Schema(), out) + return 0 +} + +func (rr cRecordReader) next(out *CArrowArray) int { + if rr.rdr.Next() { + ExportArrowRecordBatch(rr.rdr.Record(), out, nil) + return 0 + } + releaseArr(out) + return 0 +} + +func (rr cRecordReader) release() { + rr.rdr.Release() +} diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index 0b73a08d6b0c8..a143b5cb79ab9 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -27,6 +27,7 @@ import ( "errors" "io" "runtime" + "runtime/cgo" "testing" "time" "unsafe" @@ -34,6 +35,7 @@ import ( "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/decimal128" + "github.com/apache/arrow/go/v10/arrow/internal/arrdata" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/stretchr/testify/assert" ) @@ -659,3 +661,36 @@ func TestRecordReaderStream(t *testing.T) { assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2)) } } + +func TestExportRecordReaderStream(t *testing.T) { + reclist := arrdata.Records["primitives"] + rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist) + + out := createTestStreamObj() + ExportRecordReader(rdr, out) + + assert.NotNil(t, out.get_schema) + assert.NotNil(t, out.get_next) + assert.NotNil(t, out.get_last_error) + assert.NotNil(t, out.release) + assert.NotNil(t, out.private_data) + + h := *(*cgo.Handle)(out.private_data) + assert.Same(t, rdr, h.Value().(cRecordReader).rdr) + + importedRdr := ImportCArrayStream(out, nil) + i := 0 + for { + rec, err := importedRdr.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + } + + assert.Truef(t, array.RecordEqual(reclist[i], rec), "expected: %s\ngot: %s", reclist[i], rec) + i++ + } + assert.EqualValues(t, len(reclist), i) +} diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go index bb4db1e339be0..0274b01fb7316 100644 --- a/go/arrow/cdata/cdata_test_framework.go +++ b/go/arrow/cdata/cdata_test_framework.go @@ -26,7 +26,11 @@ package cdata // // void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out); // struct ArrowArray* get_test_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); } -// struct ArrowArrayStream* get_test_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); } +// struct ArrowArrayStream* get_test_stream() { +// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); +// memset(out, 0, sizeof(struct ArrowArrayStream)); +// return out; +// } // // void release_test_arr(struct ArrowArray* arr) { // for (int i = 0; i < arr->n_buffers; ++i) { @@ -251,6 +255,10 @@ func createCArr(arr arrow.Array) *CArrowArray { return carr } +func createTestStreamObj() *CArrowArrayStream { + return C.get_test_stream() +} + func arrayStreamTest() *CArrowArrayStream { st := C.get_test_stream() C.setup_array_stream_test(2, st) diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go index 4ad4b7fac3135..c7d77a52a72d8 100644 --- a/go/arrow/cdata/exports.go +++ b/go/arrow/cdata/exports.go @@ -18,42 +18,24 @@ package cdata import ( "reflect" - "sync" - "sync/atomic" + "runtime/cgo" "unsafe" "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" ) // #include // #include "arrow/c/helpers.h" +// +// typedef const char cchar_t; +// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*); +// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*); +// extern const char* streamGetError(struct ArrowArrayStream*); +// extern void streamRelease(struct ArrowArrayStream*); +// import "C" -var ( - handles = sync.Map{} - handleIdx uintptr -) - -type dataHandle uintptr - -func storeData(d arrow.ArrayData) dataHandle { - h := atomic.AddUintptr(&handleIdx, 1) - if h == 0 { - panic("cgo: ran out of space") - } - d.Retain() - handles.Store(h, d) - return dataHandle(h) -} - -func (d dataHandle) releaseData() { - arrd, ok := handles.LoadAndDelete(uintptr(d)) - if !ok { - panic("cgo: invalid datahandle") - } - arrd.(arrow.ArrayData).Release() -} - //export releaseExportedSchema func releaseExportedSchema(schema *CArrowSchema) { if C.ArrowSchemaIsReleased(schema) == 1 { @@ -108,6 +90,42 @@ func releaseExportedArray(arr *CArrowArray) { C.free(unsafe.Pointer(arr.children)) } - h := dataHandle(arr.private_data) - h.releaseData() + h := *(*cgo.Handle)(arr.private_data) + h.Value().(arrow.ArrayData).Release() + h.Delete() +} + +//export streamGetSchema +func streamGetSchema(handle *CArrowArrayStream, out *CArrowSchema) C.int { + h := *(*cgo.Handle)(handle.private_data) + rdr := h.Value().(cRecordReader) + return C.int(rdr.getSchema(out)) +} + +//export streamGetNext +func streamGetNext(handle *CArrowArrayStream, out *CArrowArray) C.int { + h := *(*cgo.Handle)(handle.private_data) + rdr := h.Value().(cRecordReader) + return C.int(rdr.next(out)) +} + +//export streamGetError +func streamGetError(*CArrowArrayStream) *C.cchar_t { return nil } + +//export streamRelease +func streamRelease(handle *CArrowArrayStream) { + h := *(*cgo.Handle)(handle.private_data) + h.Value().(cRecordReader).release() + h.Delete() + handle.release = nil + handle.private_data = nil +} + +func exportStream(rdr array.RecordReader, out *CArrowArrayStream) { + out.get_schema = (*[0]byte)(C.streamGetSchema) + out.get_next = (*[0]byte)(C.streamGetNext) + out.get_last_error = (*[0]byte)(C.streamGetError) + out.release = (*[0]byte)(C.streamRelease) + h := cgo.NewHandle(cRecordReader{rdr}) + out.private_data = unsafe.Pointer(&h) } diff --git a/go/arrow/cdata/interface.go b/go/arrow/cdata/interface.go index e567ce599a449..9b80b7c2f0dc7 100644 --- a/go/arrow/cdata/interface.go +++ b/go/arrow/cdata/interface.go @@ -225,6 +225,15 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema exportArray(arr, out, outSchema) } +// ExportRecordReader populates the CArrowArrayStream that is passed in with the appropriate +// callbacks to be a working ArrowArrayStream utilizing the passed in RecordReader. The +// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release +// callback, as such it is unnecesary to call Release on the passed in reader unless it has +// previously been retained. +func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) { + exportStream(reader, out) +} + // ReleaseCArrowArray calls ArrowArrayRelease on the passed in cdata array func ReleaseCArrowArray(arr *CArrowArray) { releaseArr(arr) } diff --git a/go/go.mod b/go/go.mod index 9e7054b8d5295..25ca1e084c75d 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,7 +16,7 @@ module github.com/apache/arrow/go/v10 -go 1.16 +go 1.17 require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c @@ -28,7 +28,6 @@ require ( github.com/google/flatbuffers v2.0.8+incompatible github.com/klauspost/asmfmt v1.3.2 github.com/klauspost/compress v1.15.9 - github.com/kr/pretty v0.3.0 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 github.com/pierrec/lz4/v4 v4.1.15 @@ -42,12 +41,33 @@ require ( gonum.org/v1/gonum v0.11.0 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 + modernc.org/sqlite v1.18.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/stretchr/objx v0.4.0 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/text v0.3.7 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.36.3 // indirect modernc.org/ccgo/v3 v3.16.9 // indirect modernc.org/libc v1.17.1 // indirect + modernc.org/mathutil v1.5.0 // indirect + modernc.org/memory v1.2.1 // indirect modernc.org/opt v0.1.3 // indirect - modernc.org/sqlite v1.18.1 modernc.org/strutil v1.1.3 // indirect + modernc.org/token v1.0.0 // indirect )