Skip to content

Commit

Permalink
GH-36669: [Go] Guard against garbage in C Data structures (#36670)
Browse files Browse the repository at this point in the history
### Rationale for this change

Prevent hard to debug crashes when using Go code with other code via C Data Interface.

### What changes are included in this PR?

In the C Stream Interface implementation, jump through a trampoline that zeroes the out parameters before letting Go see them.

Note that this can only guard against the issue when the C Stream Interface is used. 

Also, fix other issues in the C Data Interface tests with invalid pointers and uninitialized memory that were turned up by the new test here (because it calls `runtime.GC` very frequently).

### Are these changes tested?

Yes

### Are there any user-facing changes?

No

**This PR contains a "Critical Fix".**
* Closes: #36669

Lead-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
lidavidm and zeroshade committed Jul 13, 2023
1 parent de23a7e commit b975977
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 19 deletions.
10 changes: 5 additions & 5 deletions go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (exp *schemaExporter) export(field arrow.Field) {

func allocateArrowSchemaArr(n int) (out []CArrowSchema) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowSchema * C.size_t(n)))
s.Data = uintptr(C.calloc(C.size_t(n), C.sizeof_struct_ArrowSchema))
s.Len = n
s.Cap = n

Expand All @@ -292,7 +292,7 @@ func allocateArrowSchemaArr(n int) (out []CArrowSchema) {

func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowSchema)(nil))) * C.size_t(n)))
s.Data = uintptr(C.calloc(C.size_t(n), C.size_t(unsafe.Sizeof((*CArrowSchema)(nil)))))
s.Len = n
s.Cap = n

Expand All @@ -301,7 +301,7 @@ func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema) {

func allocateArrowArrayArr(n int) (out []CArrowArray) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowArray * C.size_t(n)))
s.Data = uintptr(C.calloc(C.size_t(n), C.sizeof_struct_ArrowArray))
s.Len = n
s.Cap = n

Expand All @@ -310,7 +310,7 @@ func allocateArrowArrayArr(n int) (out []CArrowArray) {

func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowArray)(nil))) * C.size_t(n)))
s.Data = uintptr(C.calloc(C.size_t(n), C.size_t(unsafe.Sizeof((*CArrowArray)(nil)))))
s.Len = n
s.Cap = n

Expand All @@ -319,7 +319,7 @@ func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) {

func allocateBufferPtrArr(n int) (out []*C.void) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*C.void)(nil))) * C.size_t(n)))
s.Data = uintptr(C.calloc(C.size_t(n), C.size_t(unsafe.Sizeof((*C.void)(nil)))))
s.Len = n
s.Cap = n

Expand Down
33 changes: 33 additions & 0 deletions go/arrow/cdata/cdata_fulltest.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out)
int test_exported_stream(struct ArrowArrayStream* stream) {
while (1) {
struct ArrowArray array;
memset(&array, 0, sizeof(array));
// Garbage - implementation should not try to call it, though!
array.release = (void*)0xDEADBEEF;
int rc = stream->get_next(stream, &array);
Expand Down Expand Up @@ -447,3 +448,35 @@ void test_stream_schema_fallible(struct ArrowArrayStream* stream) {
stream->private_data = &kFallibleStream;
stream->release = FallibleRelease;
}

int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed) {
struct ArrowSchema schema;
// Try to confuse the Go GC by putting what looks like a Go pointer here.
#ifdef _WIN32
// Thread-safe on Windows with the multithread CRT
#define DORAND rand()
#else
#define DORAND rand_r(&seed)
#endif
schema.name = (char*)(0xc000000000L + (DORAND % 0x2000));
schema.format = (char*)(0xc000000000L + (DORAND % 0x2000));
int rc = stream->get_schema(stream, &schema);
if (rc != 0) return rc;
schema.release(&schema);

while (1) {
struct ArrowArray array;
array.release = (void*)(0xc000000000L + (DORAND % 0x2000));
array.private_data = (void*)(0xc000000000L + (DORAND % 0x2000));
int rc = stream->get_next(stream, &array);
if (rc != 0) return rc;

if (array.release == NULL) {
stream->release(stream);
break;
}
array.release(&array);
}
return 0;
#undef DORAND
}
26 changes: 26 additions & 0 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"io"
"runtime"
"runtime/cgo"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -940,3 +941,28 @@ func TestRecordReaderImportError(t *testing.T) {
}
assert.Contains(t, err.Error(), "Expected error message")
}

func TestConfuseGoGc(t *testing.T) {
// Regression test for https://github.com/apache/arrow-adbc/issues/729
reclist := arrdata.Records["primitives"]

var wg sync.WaitGroup
concurrency := 32
wg.Add(concurrency)

// XXX: this test is a bit expensive
for i := 0; i < concurrency; i++ {
go func() {
for i := 0; i < 256; i++ {
rdr, err := array.NewRecordReader(reclist[0].Schema(), reclist)
assert.NoError(t, err)
runtime.GC()
assert.NoError(t, confuseGoGc(rdr))
runtime.GC()
}
wg.Done()
}()
}

wg.Wait()
}
34 changes: 27 additions & 7 deletions go/arrow/cdata/cdata_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ package cdata

// #include <stdlib.h>
// #include <stdint.h>
// #include <string.h>
// #include "arrow/c/abi.h"
// #include "arrow/c/helpers.h"
//
// void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out);
// struct ArrowArray* get_test_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArray* get_test_arr() {
// struct ArrowArray* array = (struct ArrowArray*)malloc(sizeof(struct ArrowArray));
// memset(array, 0, sizeof(*array));
// return array;
// }
// struct ArrowArrayStream* get_test_stream() {
// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream));
// memset(out, 0, sizeof(struct ArrowArrayStream));
Expand Down Expand Up @@ -56,11 +61,13 @@ package cdata
// struct ArrowSchema** test_union(const char** fmts, const char** names, int64_t* flags, const int n);
// int test_exported_stream(struct ArrowArrayStream* stream);
// void test_stream_schema_fallible(struct ArrowArrayStream* stream);
// int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed);
import "C"
import (
"errors"
"fmt"
"io"
"math/rand"
"unsafe"

"github.com/apache/arrow/go/v13/arrow"
Expand Down Expand Up @@ -271,15 +278,17 @@ func createCArr(arr arrow.Array) *CArrowArray {
carr.null_count = C.int64_t(arr.NullN())
carr.offset = C.int64_t(arr.Data().Offset())
buffers := arr.Data().Buffers()
cbuf := []unsafe.Pointer{}
for _, b := range buffers {
cbufs := allocateBufferPtrArr(len(buffers))
for i, b := range buffers {
if b != nil {
cbuf = append(cbuf, C.CBytes(b.Bytes()))
cbufs[i] = (*C.void)(C.CBytes(b.Bytes()))
} else {
cbufs[i] = nil
}
}
carr.n_buffers = C.int64_t(len(cbuf))
if len(cbuf) > 0 {
carr.buffers = &cbuf[0]
carr.n_buffers = C.int64_t(len(cbufs))
if len(cbufs) > 0 {
carr.buffers = (*unsafe.Pointer)(unsafe.Pointer(&cbufs[0]))
}
carr.release = (*[0]byte)(C.release_test_arr)

Expand Down Expand Up @@ -350,3 +359,14 @@ func fallibleSchemaTest() error {
}
return nil
}

func confuseGoGc(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
rc := C.confuse_go_gc(out, C.uint(rand.Int()))
C.free(unsafe.Pointer(out))
if rc == 0 {
return nil
}
return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
}
18 changes: 11 additions & 7 deletions go/arrow/cdata/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (
// #include <stdlib.h>
// #include "arrow/c/helpers.h"
//
// typedef const char cchar_t;
// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// extern const char* streamGetError(struct ArrowArrayStream*);
// extern void streamRelease(struct ArrowArrayStream*);
// typedef const char cchar_t;
// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// extern const char* streamGetError(struct ArrowArrayStream*);
// extern void streamRelease(struct ArrowArrayStream*);
// // XXX(https://github.com/apache/arrow-adbc/issues/729)
// int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct ArrowSchema* out);
// int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct ArrowArray* out);
//
import "C"

Expand Down Expand Up @@ -154,10 +157,11 @@ func streamRelease(handle *CArrowArrayStream) {
}

func exportStream(rdr array.RecordReader, out *CArrowArrayStream) {
out.get_schema = (*[0]byte)(C.streamGetSchema)
out.get_next = (*[0]byte)(C.streamGetNext)
out.get_schema = (*[0]byte)(C.streamGetSchemaTrampoline)
out.get_next = (*[0]byte)(C.streamGetNextTrampoline)
out.get_last_error = (*[0]byte)(C.streamGetError)
out.release = (*[0]byte)(C.streamRelease)
rdr.Retain()
h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
out.private_data = createHandle(h)
}
20 changes: 20 additions & 0 deletions go/arrow/cdata/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio
// the populating of the struct. Any memory allocated will be allocated using malloc
// which means that it is invisible to the Go Garbage Collector and must be freed manually
// using the callback on the CArrowSchema object.
//
// WARNING: the output ArrowSchema MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...), Metadata: schema.Metadata()}
exportField(dummy, out)
Expand All @@ -220,6 +225,11 @@ func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
// The release function on the populated CArrowArray will properly decrease the reference counts,
// and release the memory if the record has already been released. But since this must be explicitly
// done, make sure it is released so that you do not create a memory leak.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrowSchema) {
children := make([]arrow.ArrayData, rb.NumCols())
for i := range rb.Columns() {
Expand All @@ -243,6 +253,11 @@ func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrow
// being used by the arrow.Array passed in, in order to share with zero-copy across the C
// Data Interface. See the documentation for ExportArrowRecordBatch for details on how to ensure
// you do not leak memory and prevent unwanted, undefined or strange behaviors.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportArray(arr, out, outSchema)
}
Expand All @@ -252,6 +267,11 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema
// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release
// callback, as such it is unnecesary to call Release on the passed in reader unless it has
// previously been retained.
//
// WARNING: the output ArrowArrayStream MUST BE ZERO INITIALIZED, or the Go garbage
// collector may error at runtime, due to CGO rules ("the current implementation may
// sometimes cause a runtime error if the contents of the C memory appear to be a Go
// pointer"). You have been warned!
func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) {
exportStream(reader, out)
}
Expand Down
34 changes: 34 additions & 0 deletions go/arrow/cdata/trampoline.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string.h>

#include "arrow/c/abi.h"

int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);

int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
// XXX(https://github.com/apache/arrow-adbc/issues/729)
memset(out, 0, sizeof(*out));
return streamGetSchema(stream, out);
}

int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct ArrowArray* out) {
// XXX(https://github.com/apache/arrow-adbc/issues/729)
memset(out, 0, sizeof(*out));
return streamGetNext(stream, out);
}

0 comments on commit b975977

Please sign in to comment.