Skip to content

Commit

Permalink
this is working, shocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Aug 17, 2022
1 parent a270afc commit 2f767fd
Show file tree
Hide file tree
Showing 25 changed files with 3,725 additions and 16 deletions.
6 changes: 6 additions & 0 deletions go.work
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go 1.18

use (
./go
./go/arrow/compute
)
2 changes: 2 additions & 0 deletions go/arrow/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type ArrayData interface {
DataType() DataType
// NullN returns the number of nulls for this data instance.
NullN() int
// SetNullN allows updating the number of nulls
SetNullN(n int)
// Len returns the length of this data instance
Len() int
// Offset returns the offset into the raw buffers where this data begins
Expand Down
2 changes: 2 additions & 0 deletions go/arrow/array/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (d *Data) DataType() arrow.DataType { return d.dtype }
// NullN returns the number of nulls.
func (d *Data) NullN() int { return d.nulls }

func (d *Data) SetNullN(n int) { d.nulls = n }

// Len returns the length.
func (d *Data) Len() int { return d.length }

Expand Down
152 changes: 152 additions & 0 deletions go/arrow/array/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,155 @@ func DictArrayFromJSON(mem memory.Allocator, dt *arrow.DictionaryType, indicesJS

return NewDictionaryArray(dt, indices, dict), nil
}

func getMaxBufferLen(dt arrow.DataType, length int) int {
bufferLen := int(bitutil.BytesForBits(int64(length)))

maxOf := func(bl int) int {
if bl > bufferLen {
return bl
}
return bufferLen
}

switch dt := dt.(type) {
case *arrow.DictionaryType:
bufferLen = maxOf(getMaxBufferLen(dt.ValueType, length))
return maxOf(getMaxBufferLen(dt.IndexType, length))
case *arrow.FixedSizeBinaryType:
return maxOf(dt.ByteWidth * length)
case arrow.FixedWidthDataType:
return maxOf(int(bitutil.BytesForBits(int64(dt.BitWidth()))) * length)
case *arrow.StructType:
for _, f := range dt.Fields() {
bufferLen = maxOf(getMaxBufferLen(f.Type, length))
}
return bufferLen
case *arrow.SparseUnionType:
// type codes
bufferLen = maxOf(length)
// creates children of the same length of the union
for _, f := range dt.Fields() {
bufferLen = maxOf(getMaxBufferLen(f.Type, length))
}
return bufferLen
case *arrow.DenseUnionType:
// type codes
bufferLen = maxOf(length)
// offsets
bufferLen = maxOf(arrow.Int32SizeBytes * length)
// create children of length 1
for _, f := range dt.Fields() {
bufferLen = maxOf(getMaxBufferLen(f.Type, 1))
}
return bufferLen
case arrow.OffsetsDataType:
return maxOf(dt.OffsetTypeTraits().BytesRequired(length + 1))
case *arrow.FixedSizeListType:
return maxOf(getMaxBufferLen(dt.Elem(), int(dt.Len())*length))
case arrow.ExtensionType:
return maxOf(getMaxBufferLen(dt.StorageType(), length))
default:
panic(arrow.ErrNotImplemented)
}
}

type nullArrayFactory struct {
mem memory.Allocator
dt arrow.DataType
len int
buf *memory.Buffer
}

func (n *nullArrayFactory) create() *Data {
if n.buf == nil {
bufLen := getMaxBufferLen(n.dt, n.len)
n.buf = memory.NewResizableBuffer(n.mem)
n.buf.Resize(bufLen)
defer n.buf.Release()
}

var (
dt = n.dt
bufs = []*memory.Buffer{memory.SliceBuffer(n.buf, 0, int(bitutil.BytesForBits(int64(n.len))))}
childData []arrow.ArrayData
dictData arrow.ArrayData
)
defer bufs[0].Release()

if ex, ok := dt.(arrow.ExtensionType); ok {
dt = ex.StorageType()
}

if nf, ok := dt.(arrow.NestedType); ok {
childData = make([]arrow.ArrayData, len(nf.Fields()))
}

switch dt := dt.(type) {
case *arrow.NullType:
case *arrow.DictionaryType:
bufs = append(bufs, n.buf)
arr := MakeArrayOfNull(n.mem, dt.ValueType, 0)
defer arr.Release()
dictData = arr.Data()
case arrow.FixedWidthDataType:
bufs = append(bufs, n.buf)
case arrow.BinaryDataType:
bufs = append(bufs, n.buf, n.buf)
case arrow.OffsetsDataType:
bufs = append(bufs, n.buf)
childData[0] = n.createChild(dt, 0, 0)
defer childData[0].Release()
case *arrow.FixedSizeListType:
childData[0] = n.createChild(dt, 0, n.len*int(dt.Len()))
defer childData[0].Release()
case *arrow.StructType:
for i := range dt.Fields() {
childData[i] = n.createChild(dt, i, n.len)
defer childData[i].Release()
}
case arrow.UnionType:
bufs[0].Release()
bufs[0] = nil
bufs = append(bufs, n.buf)
// buffer is zeroed, but 0 may not be a valid type code
if dt.TypeCodes()[0] != 0 {
bufs[1] = memory.NewResizableBuffer(n.mem)
bufs[1].Resize(n.len)
defer bufs[1].Release()
memory.Set(bufs[1].Bytes(), byte(dt.TypeCodes()[0]))
}

// for sparse unions we create children with the same length
childLen := n.len
if dt.Mode() == arrow.DenseMode {
// for dense unions, offsets are all 0 and make children
// with length 1
bufs = append(bufs, n.buf)
childLen = 1
}
for i := range dt.Fields() {
childData[i] = n.createChild(dt, i, childLen)
defer childData[i].Release()
}
}

out := NewData(n.dt, n.len, bufs, childData, n.len, 0)
if dictData != nil {
out.SetDictionary(dictData)
}
return out
}

func (n *nullArrayFactory) createChild(dt arrow.DataType, i, length int) *Data {
childFactory := &nullArrayFactory{
mem: n.mem, dt: n.dt.(arrow.NestedType).Fields()[i].Type,
len: length, buf: n.buf}
return childFactory.create()
}

func MakeArrayOfNull(mem memory.Allocator, dt arrow.DataType, length int) arrow.Array {
data := (&nullArrayFactory{mem: mem, dt: dt, len: length}).create()
defer data.Release()
return MakeFromData(data)
}
11 changes: 11 additions & 0 deletions go/arrow/compute/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ const (

const UnknownLength int64 = -1

func DatumIsValue(d Datum) bool {
switch d.Kind() {
case KindScalar, KindArray, KindChunked:
return true
}
return false
}

// Datum is a variant interface for wrapping the various Arrow data structures
// for now the various Datum types just hold a Value which is the type they
// are wrapping, but it might make sense in the future for those types
Expand Down Expand Up @@ -318,6 +326,9 @@ func NewDatum(value interface{}) Datum {
case arrow.Array:
v.Data().Retain()
return &ArrayDatum{v.Data().(*array.Data)}
case arrow.ArrayData:
v.Retain()
return &ArrayDatum{v.(*array.Data)}
case *arrow.Chunked:
v.Retain()
return &ChunkedDatum{v}
Expand Down
Loading

0 comments on commit 2f767fd

Please sign in to comment.