From 34145c9dfd1fd4110e31d74f74c60bc929077e72 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Wed, 8 Apr 2026 09:47:18 +0000 Subject: [PATCH 01/10] Add Arrow C Data Interface support (#593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements both directions of the Arrow C Data Interface spec (https://arrow.apache.org/docs/format/CDataInterface.html): - `Arrow.from_c_data(schema_ptr, array_ptr)` — import an Arrow array from C-owned memory; zero-copy via `unsafe_wrap`; `CDataHandle` finalizer calls the C `release` callbacks automatically. - `Arrow.to_c_data(col)` — export an `ArrowVector` or `Arrow.Table` to C; GC roots kept alive via a token-keyed global dict; `@cfunction` release callbacks (initialised in `__init__`) delete roots on consumer release. New public types: `ArrowSchema`, `ArrowArray`, `CImportedArray`, `CImportedTable`, and `release_c_data`. Supports all Arrow column types: primitives, Bool, String/binary, List (generic, large, fixed-size), Struct, Map, DenseUnion, SparseUnion, DictEncoded, Null, and all time/date/duration types. Handles nullable columns, non-zero array offsets, and custom metadata. 68 new tests in `test/cdatainterface.jl` covering format strings, buffer layout, validity bitmaps, round-trips, release semantics, non-zero offsets, and multi-column table import. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/Arrow.jl | 9 +- src/cdatainterface.jl | 1228 ++++++++++++++++++++++++++++++++++++++++ test/cdatainterface.jl | 365 ++++++++++++ test/runtests.jl | 1 + 4 files changed, 1602 insertions(+), 1 deletion(-) create mode 100644 src/cdatainterface.jl create mode 100644 test/cdatainterface.jl diff --git a/src/Arrow.jl b/src/Arrow.jl index 6f3ccdf8..a78297c7 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -30,7 +30,6 @@ This implementation supports the 1.0 version of the specification, including sup It currently doesn't include support for: * Tensors or sparse tensors * Flight RPC - * C data interface Third-party data formats: * csv and parquet support via the existing [CSV.jl](https://github.com/JuliaData/CSV.jl) and [Parquet.jl](https://github.com/JuliaIO/Parquet.jl) packages @@ -56,6 +55,7 @@ using DataAPI, StringViews export ArrowTypes +export ArrowSchema, ArrowArray, CImportedArray, CImportedTable, from_c_data, to_c_data, release_c_data using Base: @propagate_inbounds import Base: == @@ -79,6 +79,7 @@ include("table.jl") include("write.jl") include("append.jl") include("show.jl") +include("cdatainterface.jl") const ZSTD_COMPRESSOR = Lockable{ZstdCompressor}[] const ZSTD_DECOMPRESSOR = Lockable{ZstdDecompressor}[] @@ -138,6 +139,12 @@ function __init__() resize!(empty!(ZSTD_COMPRESSOR), nt) resize!(empty!(LZ4_FRAME_DECOMPRESSOR), nt) resize!(empty!(ZSTD_DECOMPRESSOR), nt) + global _SCHEMA_RELEASE_CFUNC = @cfunction( + _release_exported_schema, Cvoid, (Ptr{ArrowSchema},) + ) + global _ARRAY_RELEASE_CFUNC = @cfunction( + _release_exported_array, Cvoid, (Ptr{ArrowArray},) + ) return end diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl new file mode 100644 index 00000000..22c5cb8d --- /dev/null +++ b/src/cdatainterface.jl @@ -0,0 +1,1228 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### +# Arrow C Data Interface +# Spec: https://arrow.apache.org/docs/format/CDataInterface.html +############################################################################### + +############################################################################### +# C struct definitions # +############################################################################### + +""" + Arrow.ArrowSchema + +Mirrors `struct ArrowSchema` from the Arrow C Data Interface specification. +Layout must be ABI-compatible with the C struct (9 pointer-sized fields). +""" +mutable struct ArrowSchema + format::Cstring + name::Cstring + metadata::Cstring + flags::Int64 + n_children::Int64 + children::Ptr{Ptr{ArrowSchema}} + dictionary::Ptr{ArrowSchema} + release::Ptr{Cvoid} + private_data::Ptr{Cvoid} +end + +""" + Arrow.ArrowArray + +Mirrors `struct ArrowArray` from the Arrow C Data Interface specification. +Layout must be ABI-compatible with the C struct (10 pointer-sized fields). +""" +mutable struct ArrowArray + length::Int64 + null_count::Int64 + offset::Int64 + n_buffers::Int64 + n_children::Int64 + buffers::Ptr{Ptr{Cvoid}} + children::Ptr{Ptr{ArrowArray}} + dictionary::Ptr{ArrowArray} + release::Ptr{Cvoid} + private_data::Ptr{Cvoid} +end + +@assert sizeof(ArrowSchema) == 9 * 8 "ArrowSchema size mismatch; expected $(9*8), got $(sizeof(ArrowSchema))" +@assert sizeof(ArrowArray) == 10 * 8 "ArrowArray size mismatch; expected $(10*8), got $(sizeof(ArrowArray))" + +# Schema flags +const CDATA_FLAG_NULLABLE = Int64(2) +const CDATA_FLAG_DICT_ORDERED = Int64(1) +const CDATA_FLAG_MAP_KEYS_SORTED = Int64(4) + +############################################################################### +# Import path (C → Julia) # +############################################################################### + +""" + CDataHandle + +Holds C-side pointers for an imported Arrow C Data Interface pair and calls +the C `release` callbacks when the Julia wrapper is garbage collected. +""" +mutable struct CDataHandle + schema_ptr::Ptr{ArrowSchema} + array_ptr::Ptr{ArrowArray} + released::Bool +end + +CDataHandle(sp::Ptr{ArrowSchema}, ap::Ptr{ArrowArray}) = CDataHandle(sp, ap, false) + +function _release_cdata_handle(h::CDataHandle) + h.released && return + h.released = true + if h.array_ptr != C_NULL + arr = unsafe_load(h.array_ptr) + if arr.release != C_NULL + ccall(arr.release, Cvoid, (Ptr{ArrowArray},), h.array_ptr) + end + end + if h.schema_ptr != C_NULL + sch = unsafe_load(h.schema_ptr) + if sch.release != C_NULL + ccall(sch.release, Cvoid, (Ptr{ArrowSchema},), h.schema_ptr) + end + end +end + +""" + CImportedArray{T} + +An `AbstractVector{T}` wrapping an imported Arrow C Data Interface array. +Holds a reference to the `CDataHandle` to keep the C-side memory alive. +Call `Arrow.release_c_data(x)` to release C resources immediately; otherwise +they are released when this object is garbage collected. +""" +struct CImportedArray{T} <: AbstractVector{T} + data::ArrowVector{T} + handle::CDataHandle +end + +Base.size(x::CImportedArray) = size(x.data) +Base.IndexStyle(::Type{<:CImportedArray}) = Base.IndexLinear() +Base.@propagate_inbounds function Base.getindex(x::CImportedArray, i::Integer) + @boundscheck checkbounds(x, i) + return @inbounds x.data[i] +end + +""" + CImportedTable + +A `Tables.AbstractColumns` wrapping imported Arrow C Data Interface arrays. +""" +struct CImportedTable + names::Vector{Symbol} + columns::Vector{CImportedArray} + lookup::Dict{Symbol,CImportedArray} + metadata::Union{Nothing,Base.ImmutableDict{String,String}} +end + +Tables.istable(::Type{<:CImportedTable}) = true +Tables.columnaccess(::Type{<:CImportedTable}) = true +Tables.columns(t::CImportedTable) = t +Tables.columnnames(t::CImportedTable) = t.names +Tables.getcolumn(t::CImportedTable, nm::Symbol) = t.lookup[nm] +Tables.getcolumn(t::CImportedTable, i::Int) = t.columns[i] +Tables.schema(t::CImportedTable) = Tables.Schema(t.names, map(eltype, t.columns)) +DataAPI.metadatasupport(::Type{CImportedTable}) = (read=true, write=false) +DataAPI.metadata(t::CImportedTable, key::AbstractString; style::Bool=false) = + style ? (get(t.metadata === nothing ? Dict() : t.metadata, key, nothing), :default) : + get(t.metadata === nothing ? Dict() : t.metadata, key, nothing) +DataAPI.metadatakeys(t::CImportedTable) = + t.metadata === nothing ? () : keys(t.metadata) + +""" + Arrow.release_c_data(x::CImportedArray) + Arrow.release_c_data(t::CImportedTable) + +Immediately release C-side resources held by an imported array or table. +After calling this, the data in `x` or `t` may become invalid. +""" +release_c_data(x::CImportedArray) = _release_cdata_handle(x.handle) +function release_c_data(t::CImportedTable) + seen = Set{UInt}() + for col in t.columns + id = UInt(pointer_from_objref(col.handle)) + if id ∉ seen + push!(seen, id) + _release_cdata_handle(col.handle) + end + end +end + +# Parse the binary key-value metadata format used by the C Data Interface +function _parse_c_metadata(ptr::Cstring) + ptr == C_NULL && return nothing + p = Ptr{UInt8}(ptr) + n_pairs = unsafe_load(Ptr{Int32}(p)) + n_pairs <= 0 && return nothing + pos = 4 # byte offset from p + dict = Base.ImmutableDict{String,String}() + for _ in 1:n_pairs + key_len = unsafe_load(Ptr{Int32}(p + pos)) + pos += 4 + key = unsafe_string(p + pos, key_len) + pos += key_len + val_len = unsafe_load(Ptr{Int32}(p + pos)) + pos += 4 + val = unsafe_string(p + pos, val_len) + pos += val_len + dict = Base.ImmutableDict(dict, key => val) + end + return dict +end + +# Load the i-th buffer pointer from an ArrowArray (0-indexed, pointer arithmetic in units of sizeof(Ptr)) +_cbuf(arr::ArrowArray, i::Int) = (arr.n_buffers > i && arr.buffers != C_NULL) ? + unsafe_load(arr.buffers, i + 1) : C_NULL + +# Load the i-th child array pointer from an ArrowArray (0-indexed) +_cchild_arr(arr::ArrowArray, i::Int) = unsafe_load(arr.children, i + 1) + +# Load the i-th child schema pointer from an ArrowSchema (0-indexed) +_cchild_sch(sch::ArrowSchema, i::Int) = unsafe_load(sch.children, i + 1) + +# Build a ValidityBitmap from C Data Interface buffer +function _import_validity(arr::ArrowArray, len::Int, off::Int) + nc = Int(arr.null_count) + vptr = Ptr{UInt8}(_cbuf(arr, 0)) + if nc == 0 || vptr == C_NULL + return ValidityBitmap(UInt8[], 1, len, 0) + end + n_bytes = cld(len + off, 8) + vbytes = unsafe_wrap(Array, vptr, n_bytes; own=false) + if off % 8 == 0 + return ValidityBitmap(vbytes, off ÷ 8 + 1, len, nc) + else + # non-byte-aligned offset: copy and repack bits + new_bytes = _copy_bit_range(vbytes, off, len) + return ValidityBitmap(new_bytes, 1, len, nc) + end +end + +# Copy a range of bits from src starting at bit offset `off` (0-indexed), length `len` +function _copy_bit_range(src::Vector{UInt8}, off::Int, len::Int) + nbytes = cld(len, 8) + dest = fill(0xff, nbytes) + for i in 0:len-1 + src_pos = off + i + src_byte = src_pos >> 3 + src_bit = src_pos & 7 + bit = (src[src_byte + 1] >> src_bit) & 1 + if bit == 0 + dst_byte = i >> 3 + dst_bit = i & 7 + dest[dst_byte + 1] &= ~(UInt8(1) << dst_bit) + end + end + return dest +end + +function _char_to_timeunit(c::Char) + c == 's' && return Meta.TimeUnit.SECOND + c == 'm' && return Meta.TimeUnit.MILLISECOND + c == 'u' && return Meta.TimeUnit.MICROSECOND + c == 'n' && return Meta.TimeUnit.NANOSECOND + error("Unknown time unit character: $c") +end + +# Parse a primitive/simple format string to its Julia storage type +function _fmt_to_storage_type(fmt::String) + fmt == "c" && return Int8 + fmt == "C" && return UInt8 + fmt == "s" && return Int16 + fmt == "S" && return UInt16 + fmt == "i" && return Int32 + fmt == "I" && return UInt32 + fmt == "l" && return Int64 + fmt == "L" && return UInt64 + fmt == "e" && return Float16 + fmt == "f" && return Float32 + fmt == "g" && return Float64 + fmt == "tdD" && return Date{Meta.DateUnit.DAY,Int32} + fmt == "tdm" && return Date{Meta.DateUnit.MILLISECOND,Int64} + fmt == "tts" && return Time{Meta.TimeUnit.SECOND,Int32} + fmt == "ttm" && return Time{Meta.TimeUnit.MILLISECOND,Int32} + fmt == "ttu" && return Time{Meta.TimeUnit.MICROSECOND,Int64} + fmt == "ttn" && return Time{Meta.TimeUnit.NANOSECOND,Int64} + fmt == "tDs" && return Duration{Meta.TimeUnit.SECOND} + fmt == "tDm" && return Duration{Meta.TimeUnit.MILLISECOND} + fmt == "tDu" && return Duration{Meta.TimeUnit.MICROSECOND} + fmt == "tDn" && return Duration{Meta.TimeUnit.NANOSECOND} + fmt == "tiM" && return Interval{Meta.IntervalUnit.YEAR_MONTH,Int32} + fmt == "tiD" && return Interval{Meta.IntervalUnit.DAY_TIME,Int64} + if startswith(fmt, "ts") && length(fmt) >= 4 + U = _char_to_timeunit(fmt[3]) + tz_str = length(fmt) > 4 ? fmt[5:end] : "" + TZ = isempty(tz_str) ? nothing : Symbol(tz_str) + return Timestamp{U,TZ} + end + if startswith(fmt, "d:") + parts = split(fmt[3:end], ',') + p = parse(Int32, parts[1]) + s_val = parse(Int32, parts[2]) + bw = length(parts) >= 3 ? parse(Int, parts[3]) : 128 + return bw == 256 ? Decimal{p,s_val,Int256} : Decimal{p,s_val,Int128} + end + error("Unsupported format string for primitive type: $fmt") +end + +# Main import function: given C pointers (already loaded), build an ArrowVector. +# handle is the top-level CDataHandle to keep C memory alive. +function _import_arrowvec( + arr_ptr::Ptr{ArrowArray}, + sch_ptr::Ptr{ArrowSchema}, + handle::CDataHandle, + convert::Bool, +) + arr = unsafe_load(arr_ptr) + sch = unsafe_load(sch_ptr) + fmt = unsafe_string(sch.format) + len = Int(arr.length) + off = Int(arr.offset) + nullable = (sch.flags & CDATA_FLAG_NULLABLE) != 0 + meta = _parse_c_metadata(sch.metadata) + validity = _import_validity(arr, len, off) + + # Null array + if fmt == "n" + return NullVector{Missing}(MissingVector(len), meta) + end + + # Boolean + if fmt == "b" + T = nullable ? Union{Bool,Missing} : Bool + dptr = Ptr{UInt8}(_cbuf(arr, 1)) + if dptr == C_NULL + return BoolVector{T}(UInt8[], 1, validity, len, meta) + end + n_bytes = cld(len + off, 8) + data_bytes = unsafe_wrap(Array, dptr, n_bytes; own=false) + if off % 8 == 0 + return BoolVector{T}(data_bytes, off ÷ 8 + 1, validity, len, meta) + else + new_bytes = _copy_bit_range(data_bytes, off, len) + return BoolVector{T}(new_bytes, 1, validity, len, meta) + end + end + + # Fixed-size binary "w:N" + if startswith(fmt, "w:") + N = parse(Int, fmt[3:end]) + T_inner = NTuple{N,UInt8} + T = nullable ? Union{T_inner,Missing} : T_inner + dptr = Ptr{UInt8}(_cbuf(arr, 1)) + n_bytes = (len + off) * N + data_bytes = dptr == C_NULL ? UInt8[] : unsafe_wrap(Array, dptr, n_bytes; own=false) + # Apply offset: skip first `off*N` bytes + data_view = off == 0 ? data_bytes : view(data_bytes, off*N+1:n_bytes) + return FixedSizeList{T,typeof(data_view)}(UInt8[], validity, data_view, len, meta) + end + + # String / binary (list with inline data) + if fmt ∈ ("u", "U", "z", "Z") + OT = (fmt == "U" || fmt == "Z") ? Int64 : Int32 + T_inner = (fmt == "u" || fmt == "U") ? String : Base.CodeUnits{UInt8,String} + T = nullable ? Union{T_inner,Missing} : T_inner + optr = Ptr{OT}(_cbuf(arr, 1)) + n_offs = len + off + 1 + offs_arr = optr == C_NULL ? OT[] : unsafe_wrap(Array, optr, n_offs; own=false) + offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offsets = Offsets(UInt8[], offs_view) + dptr = Ptr{UInt8}(_cbuf(arr, 2)) + # data length = last offset value + data_len = n_offs > 0 && optr != C_NULL ? Int(offs_arr[n_offs]) : 0 + data_bytes = dptr == C_NULL ? UInt8[] : unsafe_wrap(Array, dptr, data_len; own=false) + return List{T,OT,Vector{UInt8}}(UInt8[], validity, offsets, data_bytes, len, meta) + end + + # Generic list "+l" / "+L" + if fmt == "+l" || fmt == "+L" + OT = fmt == "+L" ? Int64 : Int32 + optr = Ptr{OT}(_cbuf(arr, 1)) + n_offs = len + off + 1 + offs_arr = optr == C_NULL ? OT[] : unsafe_wrap(Array, optr, n_offs; own=false) + offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offsets = Offsets(UInt8[], offs_view) + child_arr_ptr = _cchild_arr(arr, 0) + child_sch_ptr = _cchild_sch(sch, 0) + A = _import_arrowvec(child_arr_ptr, child_sch_ptr, handle, convert) + T_child = eltype(A) + ST = SubArray{T_child,1,typeof(A),Tuple{UnitRange{Int64}},true} + T = nullable ? Union{ST,Missing} : ST + return List{T,OT,typeof(A)}(UInt8[], validity, offsets, A, len, meta) + end + + # Fixed-size list "+w:N" + if startswith(fmt, "+w:") + N = parse(Int, fmt[4:end]) + child_arr_ptr = _cchild_arr(arr, 0) + child_sch_ptr = _cchild_sch(sch, 0) + A = _import_arrowvec(child_arr_ptr, child_sch_ptr, handle, convert) + T_child = eltype(A) + T_inner = NTuple{N,T_child} + T = nullable ? Union{T_inner,Missing} : T_inner + return FixedSizeList{T,typeof(A)}(UInt8[], validity, A, len, meta) + end + + # Struct "+s" + if fmt == "+s" + vecs = AbstractVector[] + child_names = Symbol[] + child_types = DataType[] + for i in 0:Int(sch.n_children)-1 + child_av = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) + push!(vecs, child_av) + child_sch_i = unsafe_load(_cchild_sch(sch, i)) + nm = child_sch_i.name != C_NULL ? Symbol(unsafe_string(child_sch_i.name)) : Symbol("f$i") + push!(child_names, nm) + push!(child_types, eltype(child_av)) + end + fnames = Tuple(child_names) + data = Tuple(vecs) + NT = NamedTuple{fnames,Tuple{child_types...}} + T = nullable ? Union{NT,Missing} : NT + return Struct{T,typeof(data),fnames}(validity, data, len, meta) + end + + # Map "+m" + if fmt == "+m" + optr = Ptr{Int32}(_cbuf(arr, 1)) + n_offs = len + off + 1 + offs_arr = optr == C_NULL ? Int32[] : unsafe_wrap(Array, optr, n_offs; own=false) + offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offsets = Offsets(UInt8[], offs_view) + # child[0] is entries struct (key + value fields) + A = _import_arrowvec(_cchild_arr(arr, 0), _cchild_sch(sch, 0), handle, convert) + T_entry = eltype(A) + # Build Dict type from entry type + if T_entry <: NamedTuple + K = fieldtype(T_entry, :key) + V = fieldtype(T_entry, :value) + T_inner = Dict{K,V} + else + T_inner = Dict{Any,Any} + end + T = nullable ? Union{T_inner,Missing} : T_inner + return Map{T,Int32,typeof(A)}(validity, offsets, A, len, meta) + end + + # Dense union "+ud:typeIds" + if startswith(fmt, "+ud:") + typeids_str = fmt[5:end] + typeids_parsed = isempty(typeids_str) ? nothing : + Tuple(parse(Int32, s) for s in split(typeids_str, ',')) + tptr = Ptr{UInt8}(_cbuf(arr, 0)) + n = len + off + typeids_vec = tptr == C_NULL ? UInt8[] : unsafe_wrap(Array, tptr, n; own=false) + optr = Ptr{Int32}(_cbuf(arr, 1)) + offsets_vec = optr == C_NULL ? Int32[] : unsafe_wrap(Array, optr, n; own=false) + vecs = AbstractVector[] + child_types = DataType[] + for i in 0:Int(sch.n_children)-1 + cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) + push!(vecs, cv) + push!(child_types, eltype(cv)) + end + data = Tuple(vecs) + U_types = Tuple{child_types...} + UT = UnionT{Meta.UnionMode.Dense,typeids_parsed,U_types} + T = Union{child_types...} + return DenseUnion{T,UT,typeof(data)}(UInt8[], UInt8[], typeids_vec, offsets_vec, data, meta) + end + + # Sparse union "+us:typeIds" + if startswith(fmt, "+us:") + typeids_str = fmt[5:end] + typeids_parsed = isempty(typeids_str) ? nothing : + Tuple(parse(Int32, s) for s in split(typeids_str, ',')) + tptr = Ptr{UInt8}(_cbuf(arr, 0)) + n = len + off + typeids_vec = tptr == C_NULL ? UInt8[] : unsafe_wrap(Array, tptr, n; own=false) + vecs = AbstractVector[] + child_types = DataType[] + for i in 0:Int(sch.n_children)-1 + cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) + push!(vecs, cv) + push!(child_types, eltype(cv)) + end + data = Tuple(vecs) + U_types = Tuple{child_types...} + UT = UnionT{Meta.UnionMode.Sparse,typeids_parsed,U_types} + T = Union{child_types...} + return SparseUnion{T,UT,typeof(data)}(UInt8[], typeids_vec, data, meta) + end + + # Dictionary encoded: schema.dictionary != C_NULL + if sch.dictionary != C_NULL + # schema.format is the INDEX type + S = _fmt_to_storage_type(fmt) # index type (e.g., Int8) + nullable_idx = nullable # indices may be nullable + iptr = Ptr{S}(_cbuf(arr, 1)) + n_idx = len + off + idx_arr = iptr == C_NULL ? S[] : unsafe_wrap(Array, iptr, n_idx; own=false) + idx_view = off == 0 ? idx_arr : view(idx_arr, off+1:n_idx) + idx_vec = Vector{S}(idx_view) # make a copy since DictEncoded.indices is Vector{S} + # Import dictionary values + dict_arr_ptr = arr.dictionary + dict_sch_ptr = sch.dictionary + dict_vec = _import_arrowvec(dict_arr_ptr, dict_sch_ptr, handle, convert) + T_val = eltype(dict_vec) + ordered = (sch.flags & CDATA_FLAG_DICT_ORDERED) != 0 + encoding = DictEncoding{T_val,S,typeof(dict_vec)}(0, dict_vec, ordered, nothing) + T = nullable ? Union{T_val,Missing} : T_val + return DictEncoded{T,S,typeof(dict_vec)}(UInt8[], validity, idx_vec, encoding, meta) + end + + # Primitive numeric / time types + S = _fmt_to_storage_type(fmt) + T = nullable ? Union{S,Missing} : S + dptr = Ptr{S}(_cbuf(arr, 1)) + if dptr == C_NULL + return Primitive(T, UInt8[], validity, S[], len, meta) + end + n = len + off + data_arr = unsafe_wrap(Array, dptr, n; own=false) + data_view = off == 0 ? data_arr : view(data_arr, off+1:n) + return Primitive(T, UInt8[], validity, data_view, len, meta) +end + +""" + Arrow.from_c_data(schema_ptr, array_ptr; convert=true) -> CImportedArray + +Import an Arrow array from the Arrow C Data Interface. `schema_ptr` and `array_ptr` +are pointers (`Ptr{Cvoid}` or `Ptr{Arrow.ArrowSchema}`/`Ptr{Arrow.ArrowArray}`) to +the respective C structs. The caller transfers ownership of the C structs to Julia; +the C `release` callbacks will be called when the returned `CImportedArray` is GC'd +or when `Arrow.release_c_data` is called on it. + +# Example +```julia +# schema_ptr and array_ptr come from a C library call +col = Arrow.from_c_data(schema_ptr, array_ptr) +collect(col) # materialise elements +Arrow.release_c_data(col) # or let GC handle it +``` +""" +function from_c_data( + schema_ptr::Ptr{Cvoid}, + array_ptr::Ptr{Cvoid}; + convert::Bool=true, +) + sp = Ptr{ArrowSchema}(schema_ptr) + ap = Ptr{ArrowArray}(array_ptr) + handle = CDataHandle(sp, ap) + finalizer(_release_cdata_handle, handle) + vec = _import_arrowvec(ap, sp, handle, convert) + T = eltype(vec) + return CImportedArray{T}(vec, handle) +end + +from_c_data(sp::Ptr{ArrowSchema}, ap::Ptr{ArrowArray}; kw...) = + from_c_data(Ptr{Cvoid}(sp), Ptr{Cvoid}(ap); kw...) + +""" + Arrow.from_c_data(schema_ptrs, array_ptrs; names, convert=true) -> CImportedTable + +Import multiple Arrow arrays as a table from the Arrow C Data Interface. +`schema_ptrs` and `array_ptrs` are iterables of pointers to C structs. +`names` is an optional vector of `Symbol` for column names; if not provided, +names are read from the schema `name` field. +""" +function from_c_data( + schema_ptrs, + array_ptrs; + names::Union{Nothing,Vector{Symbol}}=nothing, + convert::Bool=true, + metadata=nothing, +) + cols = CImportedArray[] + col_names = Symbol[] + for (i, (sp_raw, ap_raw)) in enumerate(zip(schema_ptrs, array_ptrs)) + sp = Ptr{ArrowSchema}(sp_raw) + ap = Ptr{ArrowArray}(ap_raw) + handle = CDataHandle(sp, ap) + finalizer(_release_cdata_handle, handle) + vec = _import_arrowvec(ap, sp, handle, convert) + T = eltype(vec) + push!(cols, CImportedArray{T}(vec, handle)) + if names !== nothing + push!(col_names, names[i]) + else + sch = unsafe_load(sp) + nm = sch.name != C_NULL ? unsafe_string(sch.name) : "col$i" + push!(col_names, Symbol(nm)) + end + end + lookup = Dict{Symbol,CImportedArray}(col_names[i] => cols[i] for i in eachindex(cols)) + return CImportedTable(col_names, cols, lookup, metadata) +end + +############################################################################### +# Export path (Julia → C) # +############################################################################### + +# Global roots dict: keeps Julia objects alive while C holds pointers +const _EXPORT_ROOTS = Dict{UInt64,Vector{Any}}() +const _EXPORT_ROOTS_LOCK = ReentrantLock() +const _EXPORT_TOKEN = Threads.Atomic{UInt64}(0) + +function _next_export_token() + return Threads.atomic_add!(_EXPORT_TOKEN, UInt64(1)) +end + +# Release callbacks (function pointers set in __init__) +# Declared as globals here; assigned in Arrow.__init__ +global _SCHEMA_RELEASE_CFUNC::Ptr{Cvoid} = C_NULL +global _ARRAY_RELEASE_CFUNC::Ptr{Cvoid} = C_NULL + +function _release_exported_schema(ptr::Ptr{ArrowSchema})::Cvoid + sch = unsafe_load(ptr) + sch.release == C_NULL && return nothing + token = UInt64(UInt(sch.private_data)) + @lock _EXPORT_ROOTS_LOCK delete!(_EXPORT_ROOTS, token) + # Null out release to signal completion (prevents double-free) + # ArrowSchema.release is the 8th field; all fields are pointer-sized (8 bytes each) + release_offset = 7 * sizeof(Ptr{Cvoid}) # 0-indexed: field 8 is at offset 7*8 + unsafe_store!(Ptr{Ptr{Cvoid}}(UInt(ptr) + release_offset), C_NULL) + return nothing +end + +function _release_exported_array(ptr::Ptr{ArrowArray})::Cvoid + arr = unsafe_load(ptr) + arr.release == C_NULL && return nothing + token = UInt64(UInt(arr.private_data)) + @lock _EXPORT_ROOTS_LOCK delete!(_EXPORT_ROOTS, token) + # Null out release (prevents double-free) + # ArrowArray.release is the 9th field; all fields are pointer-sized (8 bytes each) + release_offset = 8 * sizeof(Ptr{Cvoid}) # 0-indexed: field 9 is at offset 8*8 + unsafe_store!(Ptr{Ptr{Cvoid}}(UInt(ptr) + release_offset), C_NULL) + return nothing +end + +# Serialize Julia metadata dict to Arrow C Data Interface binary format +function _serialize_c_metadata( + meta::Union{Nothing,AbstractDict{String,String}}, +)::Vector{UInt8} + (meta === nothing || isempty(meta)) && return UInt8[] + buf = IOBuffer() + Base.write(buf, Int32(length(meta))) + for (k, v) in meta + kb = codeunits(k) + Base.write(buf, Int32(length(kb))) + Base.write(buf, kb) + vb = codeunits(v) + Base.write(buf, Int32(length(vb))) + Base.write(buf, vb) + end + return take!(buf) +end + +# Map ArrowVector type to C Data Interface format string +function _array_to_format(v::ArrowVector) + T = eltype(v) + T === Missing && return "n" + S = Base.nonmissingtype(T) + S === Union{} && return "n" + return _type_to_format(S, v) +end + +_array_to_format(v::NullVector) = "n" + +function _type_to_format(S::Type, v::ArrowVector) + S === Missing && return "n" + S === Bool && return "b" + S === Int8 && return "c" + S === UInt8 && return "C" + S === Int16 && return "s" + S === UInt16 && return "S" + S === Int32 && return "i" + S === UInt32 && return "I" + S === Int64 && return "l" + S === UInt64 && return "L" + S === Float16 && return "e" + S === Float32 && return "f" + S === Float64 && return "g" + return _type_to_format_extended(S, v) +end + +function _type_to_format_extended(S::Type, v::ArrowVector) + # Time types + if S === Date{Meta.DateUnit.DAY,Int32} + return "tdD" + elseif S === Date{Meta.DateUnit.MILLISECOND,Int64} + return "tdm" + elseif S === Time{Meta.TimeUnit.SECOND,Int32} + return "tts" + elseif S === Time{Meta.TimeUnit.MILLISECOND,Int32} + return "ttm" + elseif S === Time{Meta.TimeUnit.MICROSECOND,Int64} + return "ttu" + elseif S === Time{Meta.TimeUnit.NANOSECOND,Int64} + return "ttn" + elseif S <: Timestamp + U = S.parameters[1] + TZ = S.parameters[2] + unit_char = U === Meta.TimeUnit.SECOND ? 's' : + U === Meta.TimeUnit.MILLISECOND ? 'm' : + U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' + tz_str = TZ === nothing ? "" : String(TZ) + return "ts$(unit_char):$(tz_str)" + elseif S <: Duration + U = S.parameters[1] + unit_char = U === Meta.TimeUnit.SECOND ? 's' : + U === Meta.TimeUnit.MILLISECOND ? 'm' : + U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' + return "tD$(unit_char)" + elseif S === Interval{Meta.IntervalUnit.YEAR_MONTH,Int32} + return "tiM" + elseif S === Interval{Meta.IntervalUnit.DAY_TIME,Int64} + return "tiD" + elseif S <: Decimal + P = S.parameters[1] + SC = S.parameters[2] + T_val = S.parameters[3] + bw = T_val === Int256 ? 256 : 128 + return "d:$(P),$(SC),$(bw)" + end + # Nested container types: dispatch on ArrowVector subtype + return _container_to_format(v) +end + +function _container_to_format(v::List{T,Int32,A}) where {T,A} + S = Base.nonmissingtype(T) + if S <: AbstractString + return "u" + elseif S <: Base.CodeUnits + return "z" + else + return "+l" + end +end + +function _container_to_format(v::List{T,Int64,A}) where {T,A} + S = Base.nonmissingtype(T) + if S <: AbstractString + return "U" + elseif S <: Base.CodeUnits + return "Z" + else + return "+L" + end +end + +function _container_to_format(v::FixedSizeList{T,A}) where {T,A} + S = Base.nonmissingtype(T) + N = ArrowTypes.getsize(ArrowTypes.ArrowKind(ArrowTypes.ArrowType(S))) + if eltype(A) == UInt8 && S <: NTuple + return "w:$(N)" + else + return "+w:$(N)" + end +end + +_container_to_format(v::Map) = "+m" +_container_to_format(v::Struct) = "+s" +_container_to_format(v::NullVector) = "n" + +function _container_to_format(v::DenseUnion{T,UnionT{M,typeIds,U}}) where {T,M,typeIds,U} + ids_str = typeIds === nothing ? join(0:fieldcount(U)-1, ',') : join(typeIds, ',') + return "+ud:$(ids_str)" +end + +function _container_to_format(v::SparseUnion{T,UnionT{M,typeIds,U}}) where {T,M,typeIds,U} + ids_str = typeIds === nothing ? join(0:fieldcount(U)-1, ',') : join(typeIds, ',') + return "+us:$(ids_str)" +end + +function _container_to_format(v::DictEncoded{T,S,A}) where {T,S,A} + # Format is the INDEX type + S === Int8 && return "c" + S === Int16 && return "s" + S === Int32 && return "i" + S === Int64 && return "l" + return "i" # fallback +end + +_container_to_format(v::ArrowVector) = error("Cannot determine format string for $(typeof(v))") + +# Compute schema flags from an ArrowVector +function _schema_flags(v::ArrowVector) + T = eltype(v) + flags = Int64(0) + if T >: Missing + flags |= CDATA_FLAG_NULLABLE + end + if v isa DictEncoded && v.encoding.isOrdered + flags |= CDATA_FLAG_DICT_ORDERED + end + return flags +end +_schema_flags(v::NullVector) = CDATA_FLAG_NULLABLE + +# Get a pointer to the validity bitmap bytes, or C_NULL if no nulls +function _validity_ptr(v::ArrowVector) + bm = v.validity + bm.nc == 0 && return C_NULL + isempty(bm.bytes) && return C_NULL + return Ptr{Cvoid}(pointer(bm.bytes, bm.pos)) +end + +# DenseUnion and SparseUnion have no validity bitmap +_validity_ptr(v::DenseUnion) = C_NULL +_validity_ptr(v::SparseUnion) = C_NULL +_validity_ptr(v::NullVector) = C_NULL + +# Fill an ArrowSchema Ref for a given ArrowVector (recursive) +# roots: vector of Julia objects to keep alive (owned by the token entry in _EXPORT_ROOTS) +# token: key into _EXPORT_ROOTS for the top-level array +function _fill_schema!( + out::Ref{ArrowSchema}, + v::ArrowVector, + name::String, + token::UInt64, + roots::Vector{Any}, +) + fmt = _array_to_format(v) + fmt_bytes = Vector{UInt8}(fmt * "\0") + push!(roots, fmt_bytes) + name_bytes = Vector{UInt8}(name * "\0") + push!(roots, name_bytes) + + meta = getmetadata(v) + meta_bytes = _serialize_c_metadata(meta) + meta_ptr = isempty(meta_bytes) ? C_NULL : Cstring(pointer(meta_bytes)) + if !isempty(meta_bytes) + push!(roots, meta_bytes) + end + + flags = _schema_flags(v) + + # Build children schemas + child_schema_refs, n_children, children_ptr = + _make_child_schemas!(v, token, roots) + + # Dictionary schema + dict_schema_ref, dict_ptr = _make_dict_schema!(v, token, roots) + + out[] = ArrowSchema( + Cstring(pointer(fmt_bytes)), + Cstring(pointer(name_bytes)), + meta_ptr, + flags, + Int64(n_children), + children_ptr, + dict_ptr, + _SCHEMA_RELEASE_CFUNC, + Ptr{Cvoid}(UInt(token)), + ) + push!(roots, child_schema_refs) + push!(roots, dict_schema_ref) + return out +end + +# Build child ArrowSchema refs for a vector that has children +function _make_child_schemas!(v::ArrowVector, token::UInt64, roots::Vector{Any}) + return Ref{ArrowSchema}[], 0, Ptr{Ptr{ArrowSchema}}(C_NULL) +end + +function _make_child_schemas!(v::Union{List,FixedSizeList,Map}, token::UInt64, roots::Vector{Any}) + # These types have a single child + if v isa List && liststringtype(v) + # String/binary lists have no child array in C Data Interface + return Ref{ArrowSchema}[], 0, Ptr{Ptr{ArrowSchema}}(C_NULL) + end + if v isa FixedSizeList + T = eltype(v) + S = Base.nonmissingtype(T) + N = ArrowTypes.getsize(ArrowTypes.ArrowKind(ArrowTypes.ArrowType(S))) + if eltype(v.data) == UInt8 && S <: NTuple + # Fixed-size binary: no children + return Ref{ArrowSchema}[], 0, Ptr{Ptr{ArrowSchema}}(C_NULL) + end + end + child_ref = Ref{ArrowSchema}() + _fill_schema!(child_ref, _get_child_vec(v), "", token, roots) + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowSchema}, child_ref)] + push!(roots, child_ptr_vec) + push!(roots, child_ref) + return [child_ref], 1, Ptr{Ptr{ArrowSchema}}(pointer(child_ptr_vec)) +end + +function _make_child_schemas!(v::Struct, token::UInt64, roots::Vector{Any}) + T = eltype(v) + S = Base.nonmissingtype(T) + fnames = fieldnames(S) + child_refs = [Ref{ArrowSchema}() for _ in eachindex(v.data)] + for i in eachindex(v.data) + nm = i <= length(fnames) ? String(fnames[i]) : "f$(i-1)" + _fill_schema!(child_refs[i], v.data[i], nm, token, roots) + end + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowSchema}, r) for r in child_refs] + push!(roots, child_ptr_vec) + push!(roots, child_refs) + return child_refs, length(child_refs), Ptr{Ptr{ArrowSchema}}(pointer(child_ptr_vec)) +end + +function _make_child_schemas!( + v::Union{DenseUnion,SparseUnion}, + token::UInt64, + roots::Vector{Any}, +) + child_refs = [Ref{ArrowSchema}() for _ in eachindex(v.data)] + for i in eachindex(v.data) + _fill_schema!(child_refs[i], v.data[i], "", token, roots) + end + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowSchema}, r) for r in child_refs] + push!(roots, child_ptr_vec) + push!(roots, child_refs) + return child_refs, length(child_refs), Ptr{Ptr{ArrowSchema}}(pointer(child_ptr_vec)) +end + +function _make_child_schemas!(v::DictEncoded, token::UInt64, roots::Vector{Any}) + return Ref{ArrowSchema}[], 0, Ptr{Ptr{ArrowSchema}}(C_NULL) +end + +function _make_dict_schema!(v::ArrowVector, token::UInt64, roots::Vector{Any}) + return Ref{ArrowSchema}(), Ptr{ArrowSchema}(C_NULL) +end + +function _make_dict_schema!(v::DictEncoded, token::UInt64, roots::Vector{Any}) + dict_ref = Ref{ArrowSchema}() + _fill_schema!(dict_ref, v.encoding.data, "", token, roots) + push!(roots, dict_ref) + return dict_ref, Base.unsafe_convert(Ptr{ArrowSchema}, dict_ref) +end + +# Helper to get the child vector for List/FixedSizeList/Map +_get_child_vec(v::List) = v.data +_get_child_vec(v::FixedSizeList) = v.data +_get_child_vec(v::Map) = v.data + +# Specialisation for NullVector (has no validity field) +function _fill_array!(out::Ref{ArrowArray}, v::NullVector, token::UInt64, roots::Vector{Any}) + len = Int64(length(v)) + out[] = ArrowArray( + len, len, Int64(0), + Int64(0), Int64(0), + Ptr{Ptr{Cvoid}}(C_NULL), + Ptr{Ptr{ArrowArray}}(C_NULL), + Ptr{ArrowArray}(C_NULL), + _ARRAY_RELEASE_CFUNC, + Ptr{Cvoid}(UInt(token)), + ) + return out +end + +# Fill an ArrowArray Ref for a given ArrowVector (recursive) +function _fill_array!( + out::Ref{ArrowArray}, + v::ArrowVector, + token::UInt64, + roots::Vector{Any}, +) + len = Int64(length(v)) + nc = Int64(nullcount(v)) + off = Int64(0) + + buffers, n_buffers = _make_buffers(v, roots) + child_array_refs, n_children, children_arr_ptr = _make_child_arrays!(v, token, roots) + dict_array_ref, dict_arr_ptr = _make_dict_array!(v, token, roots) + + out[] = ArrowArray( + len, nc, off, + Int64(n_buffers), + Int64(n_children), + buffers, + children_arr_ptr, + dict_arr_ptr, + _ARRAY_RELEASE_CFUNC, + Ptr{Cvoid}(UInt(token)), + ) + push!(roots, child_array_refs) + push!(roots, dict_array_ref) + return out +end + +# Build the buffers pointer array for a given ArrowVector +function _make_buffers(v::NullVector, roots::Vector{Any}) + return Ptr{Ptr{Cvoid}}(C_NULL), 0 +end + +function _make_buffers(v::BoolVector, roots::Vector{Any}) + vp = _validity_ptr(v) + bytes = isempty(v.arrow) ? UInt8[] : v.arrow + dp = isempty(bytes) ? C_NULL : Ptr{Cvoid}(pointer(bytes, v.pos)) + push!(roots, bytes) + bufs = [vp, dp] + push!(roots, bufs) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 +end + +function _make_buffers(v::Primitive, roots::Vector{Any}) + vp = _validity_ptr(v) + # data may be a lazy wrapper (ToArrow, ToStruct, SubArray, etc.) + # Materialise to a plain Vector of the storage element type + S = eltype(v.data) + data_vec = v.data isa Vector{S} ? v.data : collect(S, v.data) + push!(roots, data_vec) + dp = isempty(data_vec) ? C_NULL : Ptr{Cvoid}(pointer(data_vec)) + bufs = [vp, dp] + push!(roots, bufs) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 +end + +function _make_buffers(v::List, roots::Vector{Any}) + vp = _validity_ptr(v) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + if liststringtype(v) + # data may be a ToList (lazy) or Vector{UInt8}; materialise to Vector{UInt8} + data_bytes = v.data isa Vector{UInt8} ? v.data : collect(UInt8, v.data) + push!(roots, data_bytes) + dp = isempty(data_bytes) ? C_NULL : Ptr{Cvoid}(pointer(data_bytes)) + # offsets may also need materialising (view → copy) + offs = v.offsets.offsets isa Vector ? v.offsets.offsets : collect(v.offsets.offsets) + push!(roots, offs) + op = Ptr{Cvoid}(pointer(offs)) + bufs = [vp, op, dp] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 3 + else + offs = v.offsets.offsets isa Vector ? v.offsets.offsets : collect(v.offsets.offsets) + push!(roots, offs) + op = Ptr{Cvoid}(pointer(offs)) + bufs = [vp, op] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 + end +end + +function _make_buffers(v::FixedSizeList, roots::Vector{Any}) + vp = _validity_ptr(v) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + T = eltype(v) + S = Base.nonmissingtype(T) + # Fixed-size binary: 2 buffers (validity + data) + if eltype(v.data) == UInt8 && S <: NTuple + dp = isempty(v.data) ? C_NULL : Ptr{Cvoid}(pointer(v.data)) + !isempty(v.data) && push!(roots, v.data) + bufs = [vp, dp] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 + else + # Fixed-size list: 1 buffer (validity only), child is separate + bufs = [vp] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 1 + end +end + +function _make_buffers(v::Map, roots::Vector{Any}) + vp = _validity_ptr(v) + offs = v.offsets.offsets isa Vector ? v.offsets.offsets : collect(v.offsets.offsets) + push!(roots, offs) + op = Ptr{Cvoid}(pointer(offs)) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + bufs = [vp, op] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 +end + +function _make_buffers(v::Struct, roots::Vector{Any}) + vp = _validity_ptr(v) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + bufs = [vp] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 1 +end + +function _make_buffers(v::DenseUnion, roots::Vector{Any}) + tp = isempty(v.typeIds) ? C_NULL : Ptr{Cvoid}(pointer(v.typeIds)) + op = isempty(v.offsets) ? C_NULL : Ptr{Cvoid}(pointer(v.offsets)) + !isempty(v.typeIds) && push!(roots, v.typeIds) + !isempty(v.offsets) && push!(roots, v.offsets) + bufs = [tp, op] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 +end + +function _make_buffers(v::SparseUnion, roots::Vector{Any}) + tp = isempty(v.typeIds) ? C_NULL : Ptr{Cvoid}(pointer(v.typeIds)) + !isempty(v.typeIds) && push!(roots, v.typeIds) + bufs = [tp] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 1 +end + +function _make_buffers(v::DictEncoded, roots::Vector{Any}) + vp = _validity_ptr(v) + ip = isempty(v.indices) ? C_NULL : Ptr{Cvoid}(pointer(v.indices)) + !isempty(v.indices) && push!(roots, v.indices) + if vp != C_NULL + push!(roots, v.validity.bytes) + end + bufs = [vp, ip] + push!(roots, bufs) + return Ptr{Ptr{Cvoid}}(pointer(bufs)), 2 +end + +# Build child ArrowArray refs +function _make_child_arrays!(v::ArrowVector, token::UInt64, roots::Vector{Any}) + return Ref{ArrowArray}[], 0, Ptr{Ptr{ArrowArray}}(C_NULL) +end + +function _make_child_arrays!( + v::Union{List,FixedSizeList,Map}, + token::UInt64, + roots::Vector{Any}, +) + if v isa List && liststringtype(v) + return Ref{ArrowArray}[], 0, Ptr{Ptr{ArrowArray}}(C_NULL) + end + if v isa FixedSizeList + T = eltype(v) + S = Base.nonmissingtype(T) + if eltype(v.data) == UInt8 && S <: NTuple + return Ref{ArrowArray}[], 0, Ptr{Ptr{ArrowArray}}(C_NULL) + end + end + child_ref = Ref{ArrowArray}() + _fill_array!(child_ref, _get_child_vec(v), token, roots) + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowArray}, child_ref)] + push!(roots, child_ptr_vec) + push!(roots, child_ref) + return [child_ref], 1, Ptr{Ptr{ArrowArray}}(pointer(child_ptr_vec)) +end + +function _make_child_arrays!(v::Struct, token::UInt64, roots::Vector{Any}) + child_refs = [Ref{ArrowArray}() for _ in eachindex(v.data)] + for i in eachindex(v.data) + _fill_array!(child_refs[i], v.data[i], token, roots) + end + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowArray}, r) for r in child_refs] + push!(roots, child_ptr_vec) + push!(roots, child_refs) + return child_refs, length(child_refs), Ptr{Ptr{ArrowArray}}(pointer(child_ptr_vec)) +end + +function _make_child_arrays!( + v::Union{DenseUnion,SparseUnion}, + token::UInt64, + roots::Vector{Any}, +) + child_refs = [Ref{ArrowArray}() for _ in eachindex(v.data)] + for i in eachindex(v.data) + _fill_array!(child_refs[i], v.data[i], token, roots) + end + child_ptr_vec = [Base.unsafe_convert(Ptr{ArrowArray}, r) for r in child_refs] + push!(roots, child_ptr_vec) + push!(roots, child_refs) + return child_refs, length(child_refs), Ptr{Ptr{ArrowArray}}(pointer(child_ptr_vec)) +end + +function _make_child_arrays!(v::DictEncoded, token::UInt64, roots::Vector{Any}) + return Ref{ArrowArray}[], 0, Ptr{Ptr{ArrowArray}}(C_NULL) +end + +function _make_dict_array!(v::ArrowVector, token::UInt64, roots::Vector{Any}) + return Ref{ArrowArray}(), Ptr{ArrowArray}(C_NULL) +end + +function _make_dict_array!(v::DictEncoded, token::UInt64, roots::Vector{Any}) + dict_ref = Ref{ArrowArray}() + _fill_array!(dict_ref, v.encoding.data, token, roots) + push!(roots, dict_ref) + return dict_ref, Base.unsafe_convert(Ptr{ArrowArray}, dict_ref) +end + +""" + Arrow.to_c_data(col::ArrowVector; name="") -> (Ref{ArrowSchema}, Ref{ArrowArray}) + +Export an Arrow array to the Arrow C Data Interface format. Returns a pair of +`Ref`s to `ArrowSchema` and `ArrowArray` structs. The consumer must call +the `release` callback in the `ArrowArray` when done, which frees the Julia +GC roots keeping the data alive. + +!!! warning + The returned `Ref` objects **must** be kept alive by the caller until the C + consumer calls `release`. Do not let them be garbage collected prematurely. + +# Example +```julia +col = Arrow.toarrowvector(Int32[1, 2, 3]) +schema_ref, array_ref = Arrow.to_c_data(col) +schema_ptr = Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, schema_ref) +array_ptr = Base.unsafe_convert(Ptr{Arrow.ArrowArray}, array_ref) +# pass schema_ptr and array_ptr to C consumer +# C consumer calls array_ref[].release when done +``` +""" +function to_c_data(col::ArrowVector; name::String="") + token = _next_export_token() + roots = Any[] + + schema_ref = Ref{ArrowSchema}() + array_ref = Ref{ArrowArray}() + + _fill_schema!(schema_ref, col, name, token, roots) + _fill_array!(array_ref, col, token, roots) + + # Keep the Ref objects themselves alive via roots + push!(roots, schema_ref) + push!(roots, array_ref) + + @lock _EXPORT_ROOTS_LOCK _EXPORT_ROOTS[token] = roots + return schema_ref, array_ref +end + +""" + Arrow.to_c_data(tbl::Arrow.Table; names=Arrow.Table.names) + -> (Vector{Ref{ArrowSchema}}, Vector{Ref{ArrowArray}}) + +Export all columns of an `Arrow.Table` to the Arrow C Data Interface format. +Returns two vectors of `Ref`s (one per column). Each column has its own +`release` token so they can be released independently. +""" +function to_c_data(tbl::Arrow.Table; names::Vector{String}=String.(Arrow.Table.names(tbl))) + schema_refs = Ref{ArrowSchema}[] + array_refs = Ref{ArrowArray}[] + for (i, col) in enumerate(Tables.columns(tbl)) + nm = i <= length(names) ? names[i] : "" + s_ref, a_ref = to_c_data(col; name=nm) + push!(schema_refs, s_ref) + push!(array_refs, a_ref) + end + return schema_refs, array_refs +end diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl new file mode 100644 index 00000000..55007dbb --- /dev/null +++ b/test/cdatainterface.jl @@ -0,0 +1,365 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@testset "Arrow C Data Interface" begin + + @testset "struct sizes" begin + @test sizeof(Arrow.ArrowSchema) == 9 * 8 + @test sizeof(Arrow.ArrowArray) == 10 * 8 + end + + # Helper: convert a Julia array to ArrowVector for export + function to_arrow(x) + return Arrow.toarrowvector(x) + end + + @testset "export: format strings" begin + for (input, expected) in [ + (Int8[1], "c"), + (UInt8[1], "C"), + (Int16[1], "s"), + (UInt16[1], "S"), + (Int32[1], "i"), + (UInt32[1], "I"), + (Int64[1], "l"), + (UInt64[1], "L"), + (Float32[1.0], "f"), + (Float64[1.0], "g"), + (Bool[true], "b"), + (["hello"], "u"), + ([missing], "n"), + ] + s_ref, a_ref = Arrow.to_c_data(to_arrow(input)) + GC.@preserve s_ref a_ref begin + @test unsafe_string(s_ref[].format) == expected + end + end + end + + @testset "export: nullable flag" begin + s_ref, _ = Arrow.to_c_data(to_arrow(Union{Int32,Missing}[1, missing])) + @test (s_ref[].flags & Arrow.CDATA_FLAG_NULLABLE) != 0 + + s_ref2, _ = Arrow.to_c_data(to_arrow(Int32[1, 2])) + @test (s_ref2[].flags & Arrow.CDATA_FLAG_NULLABLE) == 0 + end + + @testset "export: Int32 buffer contents" begin + data = Int32[10, 20, 30] + s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) + arr = a_ref[] + @test arr.length == 3 + @test arr.null_count == 0 + @test arr.offset == 0 + @test arr.n_buffers == 2 + @test arr.n_children == 0 + # validity buffer should be C_NULL (no nulls) + validity_ptr = unsafe_load(arr.buffers) + @test validity_ptr == C_NULL + # data buffer + data_ptr = unsafe_load(arr.buffers + sizeof(Ptr{Cvoid})) + @test data_ptr != C_NULL + result = unsafe_wrap(Array, Ptr{Int32}(data_ptr), 3; own=false) + @test result == Int32[10, 20, 30] + end + + @testset "export: validity bitmap" begin + data = Union{Int32,Missing}[1, missing, 3] + s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) + arr = a_ref[] + @test arr.null_count == 1 + validity_ptr = Ptr{UInt8}(unsafe_load(arr.buffers)) + @test validity_ptr != C_NULL + byte = unsafe_load(validity_ptr) + # bits 0,2 set; bit 1 clear (element 2 is missing) + @test (byte & 0x01) != 0 # element 1: valid + @test (byte & 0x02) == 0 # element 2: null + @test (byte & 0x04) != 0 # element 3: valid + end + + @testset "export: String list" begin + data = ["hello", "world"] + s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) + arr = a_ref[] + sch = s_ref[] + @test unsafe_string(sch.format) == "u" + @test arr.n_buffers == 3 + # offsets buffer + off_ptr = Ptr{Int32}(unsafe_load(arr.buffers + sizeof(Ptr{Cvoid}))) + offsets = unsafe_wrap(Array, off_ptr, 3; own=false) + @test offsets == Int32[0, 5, 10] + # data buffer + data_ptr = Ptr{UInt8}(unsafe_load(arr.buffers + 2*sizeof(Ptr{Cvoid}))) + str_bytes = unsafe_wrap(Array, data_ptr, 10; own=false) + @test String(str_bytes) == "helloworld" + end + + @testset "export: struct" begin + data = [(x=Int32(1), y="a"), (x=Int32(2), y="b")] + s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) + sch = s_ref[] + arr = a_ref[] + @test unsafe_string(sch.format) == "+s" + @test sch.n_children == 2 + @test arr.n_children == 2 + @test arr.n_buffers == 1 + # child 0: x field + c0_sch = unsafe_load(unsafe_load(sch.children)) + @test unsafe_string(c0_sch.format) == "i" + end + + @testset "export: release semantics" begin + data = Int32[1, 2, 3] + s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) + arr = a_ref[] + token = UInt64(UInt(arr.private_data)) + @test haskey(Arrow._EXPORT_ROOTS, token) + + # Simulate C calling release on the array + ccall(arr.release, Cvoid, (Ptr{Arrow.ArrowArray},), + Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)) + + # Token should be removed + @test !haskey(Arrow._EXPORT_ROOTS, token) + # release pointer should be nulled out + @test a_ref[].release == C_NULL + end + + @testset "round-trip: Int32" begin + data = Int32[1, 2, 3, 4, 5] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test collect(imported) == data + end + + @testset "round-trip: Float64 with missing" begin + data = Union{Float64,Missing}[1.0, missing, 3.14] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test isequal(collect(imported), data) + end + + @testset "round-trip: Bool" begin + data = [true, false, true, false] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test collect(imported) == data + end + + @testset "round-trip: Bool with missing" begin + data = Union{Bool,Missing}[true, missing, false] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test isequal(collect(imported), data) + end + + @testset "round-trip: String" begin + data = ["hello", "world", "foo"] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test collect(imported) == data + end + + @testset "round-trip: String with missing" begin + data = Union{String,Missing}["hello", missing, "world"] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test isequal(collect(imported), data) + end + + @testset "round-trip: Date" begin + data = [Dates.Date(2020, 1, 1), Dates.Date(2021, 6, 15)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); + convert=false, + ) + @test collect(imported) == collect(av) + end + + @testset "round-trip: Timestamp" begin + data = [Dates.DateTime(2020, 1, 1), Dates.DateTime(2021, 6, 15)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); + convert=false, + ) + @test collect(imported) == collect(av) + end + + @testset "round-trip: struct" begin + data = [(x=Int32(1), y="a"), (x=Int32(2), y="b")] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + result = collect(imported) + @test length(result) == 2 + @test result[1].x == Int32(1) + @test result[1].y == "a" + @test result[2].x == Int32(2) + @test result[2].y == "b" + end + + @testset "round-trip: dict encoded" begin + data = Arrow.DictEncode(["a", "b", "a", "c", "b"]) + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test collect(imported) == ["a", "b", "a", "c", "b"] + end + + @testset "round-trip: null array" begin + data = fill(missing, 5) + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "n" + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + ) + @test length(imported) == 5 + @test all(ismissing, imported) + end + + @testset "import: non-zero offset" begin + # Manually construct an ArrowArray with offset=2 + data = Int32[99, 99, 1, 2, 3] # logical elements start at index 3 + buf_ptrs = Ptr{Cvoid}[C_NULL, Ptr{Cvoid}(pointer(data))] + + arr_ref = Ref(Arrow.ArrowArray( + Int64(3), # length = 3 + Int64(0), # null_count = 0 + Int64(2), # offset = 2 + Int64(2), # n_buffers = 2 + Int64(0), # n_children = 0 + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), + Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), # no release needed (Julia-owned data) + Ptr{Cvoid}(C_NULL), + )) + fmt_bytes = Vector{UInt8}("i\0") + sch_ref = Ref(Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), + Cstring(C_NULL), + Cstring(C_NULL), + Int64(0), + Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), + Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + )) + GC.@preserve data buf_ptrs fmt_bytes arr_ref sch_ref begin + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, sch_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), + ) + @test collect(imported) == Int32[1, 2, 3] + end + end + + @testset "import: C_NULL validity with null_count=0" begin + data = Int32[10, 20, 30] + buf_ptrs = Ptr{Cvoid}[C_NULL, Ptr{Cvoid}(pointer(data))] + arr_ref = Ref(Arrow.ArrowArray( + Int64(3), Int64(0), Int64(0), Int64(2), Int64(0), + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), + )) + fmt_bytes = Vector{UInt8}("i\0") + sch_ref = Ref(Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), Cstring(C_NULL), Cstring(C_NULL), + Int64(0), Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), + )) + GC.@preserve data buf_ptrs fmt_bytes arr_ref sch_ref begin + imported = Arrow.from_c_data( + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, sch_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), + ) + @test collect(imported) == Int32[10, 20, 30] + end + end + + @testset "metadata serialization round-trip" begin + data = Int32[1, 2, 3] + av = to_arrow(data) + # Manually create a Primitive with metadata + meta = Base.ImmutableDict("key1" => "val1", "key2" => "val2") + av_meta = Arrow.Primitive(eltype(av), av.arrow, av.validity, av.data, length(av), meta) + s_ref, a_ref = Arrow.to_c_data(av_meta) + sch = s_ref[] + @test sch.metadata != C_NULL + parsed = Arrow._parse_c_metadata(sch.metadata) + @test parsed isa Base.ImmutableDict + @test parsed["key1"] == "val1" + @test parsed["key2"] == "val2" + end + + @testset "from_c_data table" begin + col1 = to_arrow(Int32[1, 2, 3]) + col2 = to_arrow(["a", "b", "c"]) + s1, a1 = Arrow.to_c_data(col1; name="x") + s2, a2 = Arrow.to_c_data(col2; name="y") + tbl = Arrow.from_c_data( + [Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s1)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s2))], + [Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a1)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a2))], + ) + @test Tables.columnnames(tbl) == [:x, :y] + @test collect(Tables.getcolumn(tbl, :x)) == Int32[1, 2, 3] + @test collect(Tables.getcolumn(tbl, :y)) == ["a", "b", "c"] + end + +end # @testset "Arrow C Data Interface" diff --git a/test/runtests.jl b/test/runtests.jl index 315d1b60..234b3faf 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -39,6 +39,7 @@ include(joinpath(@__DIR__, "testtables.jl")) include(joinpath(@__DIR__, "testappend.jl")) include(joinpath(@__DIR__, "integrationtest.jl")) include(joinpath(@__DIR__, "dates.jl")) +include(joinpath(@__DIR__, "cdatainterface.jl")) struct CustomStruct x::Int From 781579cc6580a1ae25303fd5ef50ce15690a9267 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Mon, 11 May 2026 16:37:27 +0000 Subject: [PATCH 02/10] Fix double-free and type narrowing in C Data Interface import Remove duplicate finalizer registrations in `from_c_data`: the `CDataHandle` was getting a finalizer attached but `CImportedArray` already owns and manages the handle's lifetime, causing a potential double-free when the GC collected the handle. Also widen `child_types` from `DataType[]` to `Type[]` so that abstract element types (e.g. Union{...}) are accepted without a type assertion error when building struct arrays. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/cdatainterface.jl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 22c5cb8d..15908d5c 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -387,7 +387,7 @@ function _import_arrowvec( if fmt == "+s" vecs = AbstractVector[] child_names = Symbol[] - child_types = DataType[] + child_types = Type[] for i in 0:Int(sch.n_children)-1 child_av = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, child_av) @@ -530,7 +530,6 @@ function from_c_data( sp = Ptr{ArrowSchema}(schema_ptr) ap = Ptr{ArrowArray}(array_ptr) handle = CDataHandle(sp, ap) - finalizer(_release_cdata_handle, handle) vec = _import_arrowvec(ap, sp, handle, convert) T = eltype(vec) return CImportedArray{T}(vec, handle) @@ -560,7 +559,6 @@ function from_c_data( sp = Ptr{ArrowSchema}(sp_raw) ap = Ptr{ArrowArray}(ap_raw) handle = CDataHandle(sp, ap) - finalizer(_release_cdata_handle, handle) vec = _import_arrowvec(ap, sp, handle, convert) T = eltype(vec) push!(cols, CImportedArray{T}(vec, handle)) From feece16503803275c98baa3f075ec1bb6879e604 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Tue, 12 May 2026 15:24:15 +0000 Subject: [PATCH 03/10] Fix BoundsError when dict-encoding CategoricalArrays with missing values CategoricalRefPool uses 0-based indices (0:n) with pool[0] as a missing sentinel. When passed to arrowvector, ToArrow wraps it with 1-based iteration (1..length(pool)). Since length(pool) == n+1, the last iteration calls pool[n+1], which is out of bounds. Fix: when firstindex(pool) != 1, skip the sentinel to give arrowvector a standard 1-based view (pool[1:end]). The existing inds adjustment (inds .-= firstindex(refa)) already produces correct Arrow dict indices (-1 for missing, 0..n-1 for valid values). Also add Table(::NamedTuple) constructor for the Arrow C Data path, and add Arrow as an explicit dep in test/Project.toml so that `julia --project=test test/runtests.jl` works in a local dev setup. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/arraytypes/dictencoding.jl | 6 +++++- src/table.jl | 9 +++++++++ test/Project.toml | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/arraytypes/dictencoding.jl b/src/arraytypes/dictencoding.jl index 3e3576c5..816e9de2 100644 --- a/src/arraytypes/dictencoding.jl +++ b/src/arraytypes/dictencoding.jl @@ -233,8 +233,12 @@ function arrowvector( end # adjust to "offset" instead of index inds .-= firstindex(refa) + # CategoricalRefPool uses 0-based indices with pool[0] as a missing sentinel; + # skip it so arrowvector receives a standard 1-based sequence. + pool_first = firstindex(pool) + dict_pool = pool_first == 1 ? pool : @view(pool[(pool_first + 1):lastindex(pool)]) data = arrowvector( - pool, + dict_pool, i, nl, fi, diff --git a/src/table.jl b/src/table.jl index de8bfc37..40b03a0e 100644 --- a/src/table.jl +++ b/src/table.jl @@ -456,6 +456,15 @@ end Tables.partitions(t::Table) = TablePartitions(t) +# Build a Table directly from a NamedTuple of column vectors (Arrow C Data path). +function Table(nt::NamedTuple{names}) where {names} + nms = collect(Symbol, names) + cols = AbstractVector[nt[nm] for nm in names] + tps = Type[eltype(c) for c in cols] + lup = Dict{Symbol,AbstractVector}(zip(nms, cols)) + return Table(nms, tps, cols, lup, Ref{Meta.Schema}()) +end + # high-level user API functions Table(input, pos::Integer=1, len=nothing; kw...) = Table([ArrowBlob(tobytes(input), pos, len)]; kw...) diff --git a/test/Project.toml b/test/Project.toml index c2e02aa8..042c940d 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -15,6 +15,7 @@ # limitations under the License. [deps] +Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd" CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597" DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" From ad641dd339d212e7807b84276ee5a310ad7f9870 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Thu, 14 May 2026 16:16:31 +0000 Subject: [PATCH 04/10] cleanup --- src/arraytypes/dictencoding.jl | 6 +----- test/Project.toml | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/arraytypes/dictencoding.jl b/src/arraytypes/dictencoding.jl index 816e9de2..3e3576c5 100644 --- a/src/arraytypes/dictencoding.jl +++ b/src/arraytypes/dictencoding.jl @@ -233,12 +233,8 @@ function arrowvector( end # adjust to "offset" instead of index inds .-= firstindex(refa) - # CategoricalRefPool uses 0-based indices with pool[0] as a missing sentinel; - # skip it so arrowvector receives a standard 1-based sequence. - pool_first = firstindex(pool) - dict_pool = pool_first == 1 ? pool : @view(pool[(pool_first + 1):lastindex(pool)]) data = arrowvector( - dict_pool, + pool, i, nl, fi, diff --git a/test/Project.toml b/test/Project.toml index 042c940d..c2e02aa8 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -15,7 +15,6 @@ # limitations under the License. [deps] -Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd" CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597" DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" From b2406e313a18193fbae3448764c0c0afea564e79 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Fri, 15 May 2026 15:56:22 +0000 Subject: [PATCH 05/10] Add comprehensive C Data Interface tests and fix import bugs Extend test/cdatainterface.jl with 13 new testsets covering all previously untested code paths in src/cdatainterface.jl: - Generic lists (+l): Int32, missing, String - Fixed-size lists (+w:N): Float32 and Int64 tuples - Maps (+m): Dict{String,Int32} - Dense unions (+ud:) and sparse unions (+us:) - All four Duration units (tDs/tDm/tDu/tDn) - Time nanoseconds (ttn) - Timestamp with UTC timezone - Interval year-month (tiM) and day-time (tiD) - Decimal{10,2,Int128} (d:10,2,128) - Arrow.Table export round-trip via to_c_data - Bool import with non-byte-aligned bit offset - release_c_data idempotency (double-release is a no-op) Writing the tests uncovered three bugs, all fixed in src/cdatainterface.jl: 1. Dense and sparse union import used `child_types = DataType[]`, which rejects abstract element types such as `Union{Missing, Int32}`. Fixed to `child_types = Type[]` (same fix already applied to the struct path in a prior commit). 2. Decimal precision and scale were parsed as Int32 in `_fmt_to_storage_type`, producing `Decimal{Int32(10),...}` instead of `Decimal{Int64(10),...}`. Since Julia type parameters carry their integer type, the two Decimal types compared unequal even with identical values. Fixed by using `parse(Int, ...)`. 3. `to_c_data(::Arrow.Table)` called `Arrow.Table.names(tbl)`, which crashes because `Table` is a type, not a module. Fixed to `Tables.columnnames(tbl)`. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/cdatainterface.jl | 10 +- test/cdatainterface.jl | 249 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 5 deletions(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 15908d5c..48a11c9a 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -277,8 +277,8 @@ function _fmt_to_storage_type(fmt::String) end if startswith(fmt, "d:") parts = split(fmt[3:end], ',') - p = parse(Int32, parts[1]) - s_val = parse(Int32, parts[2]) + p = parse(Int, parts[1]) + s_val = parse(Int, parts[2]) bw = length(parts) >= 3 ? parse(Int, parts[3]) : 128 return bw == 256 ? Decimal{p,s_val,Int256} : Decimal{p,s_val,Int128} end @@ -436,7 +436,7 @@ function _import_arrowvec( optr = Ptr{Int32}(_cbuf(arr, 1)) offsets_vec = optr == C_NULL ? Int32[] : unsafe_wrap(Array, optr, n; own=false) vecs = AbstractVector[] - child_types = DataType[] + child_types = Type[] for i in 0:Int(sch.n_children)-1 cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, cv) @@ -458,7 +458,7 @@ function _import_arrowvec( n = len + off typeids_vec = tptr == C_NULL ? UInt8[] : unsafe_wrap(Array, tptr, n; own=false) vecs = AbstractVector[] - child_types = DataType[] + child_types = Type[] for i in 0:Int(sch.n_children)-1 cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, cv) @@ -1213,7 +1213,7 @@ Export all columns of an `Arrow.Table` to the Arrow C Data Interface format. Returns two vectors of `Ref`s (one per column). Each column has its own `release` token so they can be released independently. """ -function to_c_data(tbl::Arrow.Table; names::Vector{String}=String.(Arrow.Table.names(tbl))) +function to_c_data(tbl::Arrow.Table; names::Vector{String}=String.(Tables.columnnames(tbl))) schema_refs = Ref{ArrowSchema}[] array_refs = Ref{ArrowArray}[] for (i, col) in enumerate(Tables.columns(tbl)) diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl index 55007dbb..69b43c9d 100644 --- a/test/cdatainterface.jl +++ b/test/cdatainterface.jl @@ -362,4 +362,253 @@ @test collect(Tables.getcolumn(tbl, :y)) == ["a", "b", "c"] end + # Helper: convert a Ref to a Ptr{Cvoid} for from_c_data + _cptr(r::Ref{Arrow.ArrowSchema}) = + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, r)) + _cptr(r::Ref{Arrow.ArrowArray}) = + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, r)) + + # ── Lists ──────────────────────────────────────────────────────────────── + + @testset "round-trip: list of Int32 (+l)" begin + data = [[Int32(1), Int32(2), Int32(3)], [Int32(4), Int32(5)], [Int32(6)]] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+l" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == collect(av) + end + + @testset "round-trip: list of Int32 with missing (+l)" begin + data = Union{Vector{Int32},Missing}[[Int32(1), Int32(2)], missing, [Int32(3)]] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+l" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test isequal(collect(imported), collect(av)) + end + + @testset "round-trip: list of String (+l)" begin + data = [["a", "bb"], ["ccc"], ["d", "ee", "fff"]] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+l" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == collect(av) + end + + # ── Fixed-size list ────────────────────────────────────────────────────── + + @testset "round-trip: fixed-size list NTuple{2,Float32} (+w:2)" begin + data = [(1.0f0, 2.0f0), (3.0f0, 4.0f0), (5.0f0, 6.0f0)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+w:2" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + @testset "round-trip: fixed-size list NTuple{3,Int64} (+w:3)" begin + data = [(Int64(1), Int64(2), Int64(3)), (Int64(4), Int64(5), Int64(6))] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+w:3" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Map ────────────────────────────────────────────────────────────────── + + @testset "round-trip: map Dict{String,Int32} (+m)" begin + data = [Dict("a" => Int32(1), "b" => Int32(2)), Dict("c" => Int32(3))] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "+m" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == collect(av) + end + + # ── Unions ─────────────────────────────────────────────────────────────── + + @testset "round-trip: dense union Union{Int32,String} (+ud:)" begin + data = Union{Int32,String}[Int32(1), "hello", Int32(3), "world"] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test startswith(unsafe_string(s_ref[].format), "+ud:") + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == collect(av) + end + + @testset "round-trip: sparse union Union{Int32,Float64} (+us:)" begin + data = Union{Int32,Float64}[Int32(1), 2.0, Int32(3), 4.5] + av = Arrow.toarrowvector(data; denseunions=false) + s_ref, a_ref = Arrow.to_c_data(av) + @test startswith(unsafe_string(s_ref[].format), "+us:") + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == collect(av) + end + + # ── Time-of-day ────────────────────────────────────────────────────────── + + @testset "round-trip: Time nanoseconds (ttn)" begin + data = [Dates.Time(12, 30, 0), Dates.Time(0, 0, 1, 0, 0, 42)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "ttn" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Duration ───────────────────────────────────────────────────────────── + + @testset "round-trip: Duration seconds (tDs)" begin + data = [Dates.Second(5), Dates.Second(10), Dates.Second(-1)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tDs" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + @testset "round-trip: Duration milliseconds (tDm)" begin + data = [Dates.Millisecond(100), Dates.Millisecond(-50)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tDm" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + @testset "round-trip: Duration microseconds (tDu)" begin + data = [Dates.Microsecond(1000), Dates.Microsecond(2000)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tDu" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + @testset "round-trip: Duration nanoseconds (tDn)" begin + data = [Dates.Nanosecond(999), Dates.Nanosecond(0)] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tDn" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Timestamp with timezone ─────────────────────────────────────────────── + + @testset "round-trip: Timestamp with UTC timezone (tsm:UTC)" begin + data = [ + TimeZones.ZonedDateTime(2023, 1, 1, TimeZones.tz"UTC"), + TimeZones.ZonedDateTime(2023, 6, 1, 12, 0, 0, TimeZones.tz"UTC"), + ] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + fmt = unsafe_string(s_ref[].format) + @test startswith(fmt, "ts") && endswith(fmt, ":UTC") + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Interval ───────────────────────────────────────────────────────────── + + @testset "round-trip: Interval year-month (tiM)" begin + IU = Arrow.Meta.Flatbuf.IntervalUnit + YM = Arrow.Interval{IU.YEAR_MONTH,Int32} + data = YM[YM(Int32(3)), YM(Int32(-1)), YM(Int32(0))] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tiM" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + @testset "round-trip: Interval day-time (tiD)" begin + IU = Arrow.Meta.Flatbuf.IntervalUnit + DT = Arrow.Interval{IU.DAY_TIME,Int64} + data = DT[DT(Int64(86400)), DT(Int64(0)), DT(Int64(-3600))] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "tiD" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Decimal ─────────────────────────────────────────────────────────────── + + @testset "round-trip: Decimal{10,2,Int128} (d:10,2,128)" begin + D = Arrow.Decimal{10,2,Int128} + data = D[D(Int128(314)), D(Int128(-100)), D(Int128(0))] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test unsafe_string(s_ref[].format) == "d:10,2,128" + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) + @test collect(imported) == collect(av) + end + + # ── Table export ────────────────────────────────────────────────────────── + + @testset "to_c_data: Arrow.Table round-trip" begin + io = IOBuffer() + Arrow.write(io, (x=Int32[1, 2, 3], y=["a", "b", "c"])) + seekstart(io) + tbl = Arrow.Table(io) + srefs, arefs = Arrow.to_c_data(tbl) + @test length(srefs) == 2 + sptrs = [_cptr(r) for r in srefs] + aptrs = [_cptr(r) for r in arefs] + GC.@preserve srefs arefs begin + tbl2 = Arrow.from_c_data(sptrs, aptrs; names=[:x, :y]) + @test Tables.columnnames(tbl2) == [:x, :y] + @test collect(Tables.getcolumn(tbl2, :x)) == Int32[1, 2, 3] + @test collect(Tables.getcolumn(tbl2, :y)) == ["a", "b", "c"] + end + end + + # ── Bool with non-byte-aligned offset ──────────────────────────────────── + + @testset "import: Bool with non-byte-aligned offset=3" begin + # Packed byte: 0b10110110 (LSB first) + # bits 3,4,5,6,7 → 0,1,1,0,1 (reading from bit-position 3) + bools = UInt8[0b10110110] + validity = UInt8[0xff] + buf_ptrs = Ptr{Cvoid}[Ptr{Cvoid}(pointer(validity)), Ptr{Cvoid}(pointer(bools))] + arr_ref = Ref(Arrow.ArrowArray( + Int64(5), Int64(0), Int64(3), Int64(2), Int64(0), + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), + Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), + )) + fmt_bytes = Vector{UInt8}("b\0") + sch_ref = Ref(Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), Cstring(C_NULL), Cstring(C_NULL), + Int64(0), Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), + )) + GC.@preserve bools validity buf_ptrs arr_ref sch_ref begin + imported = Arrow.from_c_data(_cptr(sch_ref), _cptr(arr_ref)) + @test collect(imported) == [false, true, true, false, true] + end + end + + # ── release_c_data idempotency ──────────────────────────────────────────── + + @testset "release_c_data: double-release is a no-op" begin + col1 = to_arrow(Int32[1, 2]) + col2 = to_arrow(["a", "b"]) + s1, a1 = Arrow.to_c_data(col1; name="x") + s2, a2 = Arrow.to_c_data(col2; name="y") + sptrs = [_cptr(s1), _cptr(s2)] + aptrs = [_cptr(a1), _cptr(a2)] + GC.@preserve s1 a1 s2 a2 begin + tbl = Arrow.from_c_data(sptrs, aptrs) + Arrow.release_c_data(tbl) + @test_nowarn Arrow.release_c_data(tbl) # second call must not throw + end + end + end # @testset "Arrow C Data Interface" From 5dee1f02a39170ed12fb12a67667fb68832c395a Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Tue, 19 May 2026 19:16:45 +0000 Subject: [PATCH 06/10] formatting --- src/Arrow.jl | 13 ++- src/cdatainterface.jl | 186 ++++++++++++++++++++---------------- test/cdatainterface.jl | 212 ++++++++++++++++++++++++----------------- 3 files changed, 237 insertions(+), 174 deletions(-) diff --git a/src/Arrow.jl b/src/Arrow.jl index a78297c7..dc28e84d 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -55,7 +55,8 @@ using DataAPI, StringViews export ArrowTypes -export ArrowSchema, ArrowArray, CImportedArray, CImportedTable, from_c_data, to_c_data, release_c_data +export ArrowSchema, + ArrowArray, CImportedArray, CImportedTable, from_c_data, to_c_data, release_c_data using Base: @propagate_inbounds import Base: == @@ -139,12 +140,10 @@ function __init__() resize!(empty!(ZSTD_COMPRESSOR), nt) resize!(empty!(LZ4_FRAME_DECOMPRESSOR), nt) resize!(empty!(ZSTD_DECOMPRESSOR), nt) - global _SCHEMA_RELEASE_CFUNC = @cfunction( - _release_exported_schema, Cvoid, (Ptr{ArrowSchema},) - ) - global _ARRAY_RELEASE_CFUNC = @cfunction( - _release_exported_array, Cvoid, (Ptr{ArrowArray},) - ) + global _SCHEMA_RELEASE_CFUNC = + @cfunction(_release_exported_schema, Cvoid, (Ptr{ArrowSchema},)) + global _ARRAY_RELEASE_CFUNC = + @cfunction(_release_exported_array, Cvoid, (Ptr{ArrowArray},)) return end diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 48a11c9a..1a5c9fec 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -64,9 +64,9 @@ end @assert sizeof(ArrowArray) == 10 * 8 "ArrowArray size mismatch; expected $(10*8), got $(sizeof(ArrowArray))" # Schema flags -const CDATA_FLAG_NULLABLE = Int64(2) -const CDATA_FLAG_DICT_ORDERED = Int64(1) -const CDATA_FLAG_MAP_KEYS_SORTED = Int64(4) +const CDATA_FLAG_NULLABLE = Int64(2) +const CDATA_FLAG_DICT_ORDERED = Int64(1) +const CDATA_FLAG_MAP_KEYS_SORTED = Int64(4) ############################################################################### # Import path (C → Julia) # @@ -145,9 +145,8 @@ Tables.schema(t::CImportedTable) = Tables.Schema(t.names, map(eltype, t.columns) DataAPI.metadatasupport(::Type{CImportedTable}) = (read=true, write=false) DataAPI.metadata(t::CImportedTable, key::AbstractString; style::Bool=false) = style ? (get(t.metadata === nothing ? Dict() : t.metadata, key, nothing), :default) : - get(t.metadata === nothing ? Dict() : t.metadata, key, nothing) -DataAPI.metadatakeys(t::CImportedTable) = - t.metadata === nothing ? () : keys(t.metadata) + get(t.metadata === nothing ? Dict() : t.metadata, key, nothing) +DataAPI.metadatakeys(t::CImportedTable) = t.metadata === nothing ? () : keys(t.metadata) """ Arrow.release_c_data(x::CImportedArray) @@ -176,7 +175,7 @@ function _parse_c_metadata(ptr::Cstring) n_pairs <= 0 && return nothing pos = 4 # byte offset from p dict = Base.ImmutableDict{String,String}() - for _ in 1:n_pairs + for _ = 1:n_pairs key_len = unsafe_load(Ptr{Int32}(p + pos)) pos += 4 key = unsafe_string(p + pos, key_len) @@ -191,8 +190,8 @@ function _parse_c_metadata(ptr::Cstring) end # Load the i-th buffer pointer from an ArrowArray (0-indexed, pointer arithmetic in units of sizeof(Ptr)) -_cbuf(arr::ArrowArray, i::Int) = (arr.n_buffers > i && arr.buffers != C_NULL) ? - unsafe_load(arr.buffers, i + 1) : C_NULL +_cbuf(arr::ArrowArray, i::Int) = + (arr.n_buffers > i && arr.buffers != C_NULL) ? unsafe_load(arr.buffers, i + 1) : C_NULL # Load the i-th child array pointer from an ArrowArray (0-indexed) _cchild_arr(arr::ArrowArray, i::Int) = unsafe_load(arr.children, i + 1) @@ -222,14 +221,14 @@ end function _copy_bit_range(src::Vector{UInt8}, off::Int, len::Int) nbytes = cld(len, 8) dest = fill(0xff, nbytes) - for i in 0:len-1 + for i = 0:(len - 1) src_pos = off + i src_byte = src_pos >> 3 - src_bit = src_pos & 7 + src_bit = src_pos & 7 bit = (src[src_byte + 1] >> src_bit) & 1 if bit == 0 dst_byte = i >> 3 - dst_bit = i & 7 + dst_bit = i & 7 dest[dst_byte + 1] &= ~(UInt8(1) << dst_bit) end end @@ -246,17 +245,17 @@ end # Parse a primitive/simple format string to its Julia storage type function _fmt_to_storage_type(fmt::String) - fmt == "c" && return Int8 - fmt == "C" && return UInt8 - fmt == "s" && return Int16 - fmt == "S" && return UInt16 - fmt == "i" && return Int32 - fmt == "I" && return UInt32 - fmt == "l" && return Int64 - fmt == "L" && return UInt64 - fmt == "e" && return Float16 - fmt == "f" && return Float32 - fmt == "g" && return Float64 + fmt == "c" && return Int8 + fmt == "C" && return UInt8 + fmt == "s" && return Int16 + fmt == "S" && return UInt16 + fmt == "i" && return Int32 + fmt == "I" && return UInt32 + fmt == "l" && return Int64 + fmt == "L" && return UInt64 + fmt == "e" && return Float16 + fmt == "f" && return Float32 + fmt == "g" && return Float64 fmt == "tdD" && return Date{Meta.DateUnit.DAY,Int32} fmt == "tdm" && return Date{Meta.DateUnit.MILLISECOND,Int64} fmt == "tts" && return Time{Meta.TimeUnit.SECOND,Int32} @@ -333,7 +332,7 @@ function _import_arrowvec( n_bytes = (len + off) * N data_bytes = dptr == C_NULL ? UInt8[] : unsafe_wrap(Array, dptr, n_bytes; own=false) # Apply offset: skip first `off*N` bytes - data_view = off == 0 ? data_bytes : view(data_bytes, off*N+1:n_bytes) + data_view = off == 0 ? data_bytes : view(data_bytes, (off * N + 1):n_bytes) return FixedSizeList{T,typeof(data_view)}(UInt8[], validity, data_view, len, meta) end @@ -345,12 +344,13 @@ function _import_arrowvec( optr = Ptr{OT}(_cbuf(arr, 1)) n_offs = len + off + 1 offs_arr = optr == C_NULL ? OT[] : unsafe_wrap(Array, optr, n_offs; own=false) - offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offs_view = off == 0 ? offs_arr : view(offs_arr, (off + 1):n_offs) offsets = Offsets(UInt8[], offs_view) dptr = Ptr{UInt8}(_cbuf(arr, 2)) # data length = last offset value data_len = n_offs > 0 && optr != C_NULL ? Int(offs_arr[n_offs]) : 0 - data_bytes = dptr == C_NULL ? UInt8[] : unsafe_wrap(Array, dptr, data_len; own=false) + data_bytes = + dptr == C_NULL ? UInt8[] : unsafe_wrap(Array, dptr, data_len; own=false) return List{T,OT,Vector{UInt8}}(UInt8[], validity, offsets, data_bytes, len, meta) end @@ -360,7 +360,7 @@ function _import_arrowvec( optr = Ptr{OT}(_cbuf(arr, 1)) n_offs = len + off + 1 offs_arr = optr == C_NULL ? OT[] : unsafe_wrap(Array, optr, n_offs; own=false) - offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offs_view = off == 0 ? offs_arr : view(offs_arr, (off + 1):n_offs) offsets = Offsets(UInt8[], offs_view) child_arr_ptr = _cchild_arr(arr, 0) child_sch_ptr = _cchild_sch(sch, 0) @@ -388,11 +388,14 @@ function _import_arrowvec( vecs = AbstractVector[] child_names = Symbol[] child_types = Type[] - for i in 0:Int(sch.n_children)-1 - child_av = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) + for i = 0:(Int(sch.n_children) - 1) + child_av = + _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, child_av) child_sch_i = unsafe_load(_cchild_sch(sch, i)) - nm = child_sch_i.name != C_NULL ? Symbol(unsafe_string(child_sch_i.name)) : Symbol("f$i") + nm = + child_sch_i.name != C_NULL ? Symbol(unsafe_string(child_sch_i.name)) : + Symbol("f$i") push!(child_names, nm) push!(child_types, eltype(child_av)) end @@ -408,7 +411,7 @@ function _import_arrowvec( optr = Ptr{Int32}(_cbuf(arr, 1)) n_offs = len + off + 1 offs_arr = optr == C_NULL ? Int32[] : unsafe_wrap(Array, optr, n_offs; own=false) - offs_view = off == 0 ? offs_arr : view(offs_arr, off+1:n_offs) + offs_view = off == 0 ? offs_arr : view(offs_arr, (off + 1):n_offs) offsets = Offsets(UInt8[], offs_view) # child[0] is entries struct (key + value fields) A = _import_arrowvec(_cchild_arr(arr, 0), _cchild_sch(sch, 0), handle, convert) @@ -428,7 +431,8 @@ function _import_arrowvec( # Dense union "+ud:typeIds" if startswith(fmt, "+ud:") typeids_str = fmt[5:end] - typeids_parsed = isempty(typeids_str) ? nothing : + typeids_parsed = + isempty(typeids_str) ? nothing : Tuple(parse(Int32, s) for s in split(typeids_str, ',')) tptr = Ptr{UInt8}(_cbuf(arr, 0)) n = len + off @@ -437,7 +441,7 @@ function _import_arrowvec( offsets_vec = optr == C_NULL ? Int32[] : unsafe_wrap(Array, optr, n; own=false) vecs = AbstractVector[] child_types = Type[] - for i in 0:Int(sch.n_children)-1 + for i = 0:(Int(sch.n_children) - 1) cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, cv) push!(child_types, eltype(cv)) @@ -446,20 +450,28 @@ function _import_arrowvec( U_types = Tuple{child_types...} UT = UnionT{Meta.UnionMode.Dense,typeids_parsed,U_types} T = Union{child_types...} - return DenseUnion{T,UT,typeof(data)}(UInt8[], UInt8[], typeids_vec, offsets_vec, data, meta) + return DenseUnion{T,UT,typeof(data)}( + UInt8[], + UInt8[], + typeids_vec, + offsets_vec, + data, + meta, + ) end # Sparse union "+us:typeIds" if startswith(fmt, "+us:") typeids_str = fmt[5:end] - typeids_parsed = isempty(typeids_str) ? nothing : + typeids_parsed = + isempty(typeids_str) ? nothing : Tuple(parse(Int32, s) for s in split(typeids_str, ',')) tptr = Ptr{UInt8}(_cbuf(arr, 0)) n = len + off typeids_vec = tptr == C_NULL ? UInt8[] : unsafe_wrap(Array, tptr, n; own=false) vecs = AbstractVector[] child_types = Type[] - for i in 0:Int(sch.n_children)-1 + for i = 0:(Int(sch.n_children) - 1) cv = _import_arrowvec(_cchild_arr(arr, i), _cchild_sch(sch, i), handle, convert) push!(vecs, cv) push!(child_types, eltype(cv)) @@ -479,7 +491,7 @@ function _import_arrowvec( iptr = Ptr{S}(_cbuf(arr, 1)) n_idx = len + off idx_arr = iptr == C_NULL ? S[] : unsafe_wrap(Array, iptr, n_idx; own=false) - idx_view = off == 0 ? idx_arr : view(idx_arr, off+1:n_idx) + idx_view = off == 0 ? idx_arr : view(idx_arr, (off + 1):n_idx) idx_vec = Vector{S}(idx_view) # make a copy since DictEncoded.indices is Vector{S} # Import dictionary values dict_arr_ptr = arr.dictionary @@ -501,7 +513,7 @@ function _import_arrowvec( end n = len + off data_arr = unsafe_wrap(Array, dptr, n; own=false) - data_view = off == 0 ? data_arr : view(data_arr, off+1:n) + data_view = off == 0 ? data_arr : view(data_arr, (off + 1):n) return Primitive(T, UInt8[], validity, data_view, len, meta) end @@ -522,11 +534,7 @@ collect(col) # materialise elements Arrow.release_c_data(col) # or let GC handle it ``` """ -function from_c_data( - schema_ptr::Ptr{Cvoid}, - array_ptr::Ptr{Cvoid}; - convert::Bool=true, -) +function from_c_data(schema_ptr::Ptr{Cvoid}, array_ptr::Ptr{Cvoid}; convert::Bool=true) sp = Ptr{ArrowSchema}(schema_ptr) ap = Ptr{ArrowArray}(array_ptr) handle = CDataHandle(sp, ap) @@ -579,9 +587,9 @@ end ############################################################################### # Global roots dict: keeps Julia objects alive while C holds pointers -const _EXPORT_ROOTS = Dict{UInt64,Vector{Any}}() +const _EXPORT_ROOTS = Dict{UInt64,Vector{Any}}() const _EXPORT_ROOTS_LOCK = ReentrantLock() -const _EXPORT_TOKEN = Threads.Atomic{UInt64}(0) +const _EXPORT_TOKEN = Threads.Atomic{UInt64}(0) function _next_export_token() return Threads.atomic_add!(_EXPORT_TOKEN, UInt64(1)) @@ -590,7 +598,7 @@ end # Release callbacks (function pointers set in __init__) # Declared as globals here; assigned in Arrow.__init__ global _SCHEMA_RELEASE_CFUNC::Ptr{Cvoid} = C_NULL -global _ARRAY_RELEASE_CFUNC::Ptr{Cvoid} = C_NULL +global _ARRAY_RELEASE_CFUNC::Ptr{Cvoid} = C_NULL function _release_exported_schema(ptr::Ptr{ArrowSchema})::Cvoid sch = unsafe_load(ptr) @@ -646,19 +654,19 @@ end _array_to_format(v::NullVector) = "n" function _type_to_format(S::Type, v::ArrowVector) - S === Missing && return "n" - S === Bool && return "b" - S === Int8 && return "c" - S === UInt8 && return "C" - S === Int16 && return "s" - S === UInt16 && return "S" - S === Int32 && return "i" - S === UInt32 && return "I" - S === Int64 && return "l" - S === UInt64 && return "L" - S === Float16 && return "e" - S === Float32 && return "f" - S === Float64 && return "g" + S === Missing && return "n" + S === Bool && return "b" + S === Int8 && return "c" + S === UInt8 && return "C" + S === Int16 && return "s" + S === UInt16 && return "S" + S === Int32 && return "i" + S === UInt32 && return "I" + S === Int64 && return "l" + S === UInt64 && return "L" + S === Float16 && return "e" + S === Float32 && return "f" + S === Float64 && return "g" return _type_to_format_extended(S, v) end @@ -679,16 +687,18 @@ function _type_to_format_extended(S::Type, v::ArrowVector) elseif S <: Timestamp U = S.parameters[1] TZ = S.parameters[2] - unit_char = U === Meta.TimeUnit.SECOND ? 's' : - U === Meta.TimeUnit.MILLISECOND ? 'm' : - U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' + unit_char = + U === Meta.TimeUnit.SECOND ? 's' : + U === Meta.TimeUnit.MILLISECOND ? 'm' : + U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' tz_str = TZ === nothing ? "" : String(TZ) return "ts$(unit_char):$(tz_str)" elseif S <: Duration U = S.parameters[1] - unit_char = U === Meta.TimeUnit.SECOND ? 's' : - U === Meta.TimeUnit.MILLISECOND ? 'm' : - U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' + unit_char = + U === Meta.TimeUnit.SECOND ? 's' : + U === Meta.TimeUnit.MILLISECOND ? 'm' : + U === Meta.TimeUnit.MICROSECOND ? 'u' : 'n' return "tD$(unit_char)" elseif S === Interval{Meta.IntervalUnit.YEAR_MONTH,Int32} return "tiM" @@ -737,30 +747,31 @@ function _container_to_format(v::FixedSizeList{T,A}) where {T,A} end end -_container_to_format(v::Map) = "+m" -_container_to_format(v::Struct) = "+s" +_container_to_format(v::Map) = "+m" +_container_to_format(v::Struct) = "+s" _container_to_format(v::NullVector) = "n" function _container_to_format(v::DenseUnion{T,UnionT{M,typeIds,U}}) where {T,M,typeIds,U} - ids_str = typeIds === nothing ? join(0:fieldcount(U)-1, ',') : join(typeIds, ',') + ids_str = typeIds === nothing ? join(0:(fieldcount(U) - 1), ',') : join(typeIds, ',') return "+ud:$(ids_str)" end function _container_to_format(v::SparseUnion{T,UnionT{M,typeIds,U}}) where {T,M,typeIds,U} - ids_str = typeIds === nothing ? join(0:fieldcount(U)-1, ',') : join(typeIds, ',') + ids_str = typeIds === nothing ? join(0:(fieldcount(U) - 1), ',') : join(typeIds, ',') return "+us:$(ids_str)" end function _container_to_format(v::DictEncoded{T,S,A}) where {T,S,A} # Format is the INDEX type - S === Int8 && return "c" + S === Int8 && return "c" S === Int16 && return "s" S === Int32 && return "i" S === Int64 && return "l" return "i" # fallback end -_container_to_format(v::ArrowVector) = error("Cannot determine format string for $(typeof(v))") +_container_to_format(v::ArrowVector) = + error("Cannot determine format string for $(typeof(v))") # Compute schema flags from an ArrowVector function _schema_flags(v::ArrowVector) @@ -815,8 +826,7 @@ function _fill_schema!( flags = _schema_flags(v) # Build children schemas - child_schema_refs, n_children, children_ptr = - _make_child_schemas!(v, token, roots) + child_schema_refs, n_children, children_ptr = _make_child_schemas!(v, token, roots) # Dictionary schema dict_schema_ref, dict_ptr = _make_dict_schema!(v, token, roots) @@ -842,7 +852,11 @@ function _make_child_schemas!(v::ArrowVector, token::UInt64, roots::Vector{Any}) return Ref{ArrowSchema}[], 0, Ptr{Ptr{ArrowSchema}}(C_NULL) end -function _make_child_schemas!(v::Union{List,FixedSizeList,Map}, token::UInt64, roots::Vector{Any}) +function _make_child_schemas!( + v::Union{List,FixedSizeList,Map}, + token::UInt64, + roots::Vector{Any}, +) # These types have a single child if v isa List && liststringtype(v) # String/binary lists have no child array in C Data Interface @@ -916,11 +930,19 @@ _get_child_vec(v::FixedSizeList) = v.data _get_child_vec(v::Map) = v.data # Specialisation for NullVector (has no validity field) -function _fill_array!(out::Ref{ArrowArray}, v::NullVector, token::UInt64, roots::Vector{Any}) +function _fill_array!( + out::Ref{ArrowArray}, + v::NullVector, + token::UInt64, + roots::Vector{Any}, +) len = Int64(length(v)) out[] = ArrowArray( - len, len, Int64(0), - Int64(0), Int64(0), + len, + len, + Int64(0), + Int64(0), + Int64(0), Ptr{Ptr{Cvoid}}(C_NULL), Ptr{Ptr{ArrowArray}}(C_NULL), Ptr{ArrowArray}(C_NULL), @@ -938,7 +960,7 @@ function _fill_array!( roots::Vector{Any}, ) len = Int64(length(v)) - nc = Int64(nullcount(v)) + nc = Int64(nullcount(v)) off = Int64(0) buffers, n_buffers = _make_buffers(v, roots) @@ -946,7 +968,9 @@ function _fill_array!( dict_array_ref, dict_arr_ptr = _make_dict_array!(v, token, roots) out[] = ArrowArray( - len, nc, off, + len, + nc, + off, Int64(n_buffers), Int64(n_children), buffers, @@ -1192,7 +1216,7 @@ function to_c_data(col::ArrowVector; name::String="") roots = Any[] schema_ref = Ref{ArrowSchema}() - array_ref = Ref{ArrowArray}() + array_ref = Ref{ArrowArray}() _fill_schema!(schema_ref, col, name, token, roots) _fill_array!(array_ref, col, token, roots) @@ -1215,7 +1239,7 @@ Returns two vectors of `Ref`s (one per column). Each column has its own """ function to_c_data(tbl::Arrow.Table; names::Vector{String}=String.(Tables.columnnames(tbl))) schema_refs = Ref{ArrowSchema}[] - array_refs = Ref{ArrowArray}[] + array_refs = Ref{ArrowArray}[] for (i, col) in enumerate(Tables.columns(tbl)) nm = i <= length(names) ? names[i] : "" s_ref, a_ref = to_c_data(col; name=nm) diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl index 69b43c9d..ce25f949 100644 --- a/test/cdatainterface.jl +++ b/test/cdatainterface.jl @@ -15,10 +15,9 @@ # limitations under the License. @testset "Arrow C Data Interface" begin - @testset "struct sizes" begin @test sizeof(Arrow.ArrowSchema) == 9 * 8 - @test sizeof(Arrow.ArrowArray) == 10 * 8 + @test sizeof(Arrow.ArrowArray) == 10 * 8 end # Helper: convert a Julia array to ArrowVector for export @@ -28,19 +27,19 @@ @testset "export: format strings" begin for (input, expected) in [ - (Int8[1], "c"), - (UInt8[1], "C"), - (Int16[1], "s"), - (UInt16[1], "S"), - (Int32[1], "i"), - (UInt32[1], "I"), - (Int64[1], "l"), - (UInt64[1], "L"), + (Int8[1], "c"), + (UInt8[1], "C"), + (Int16[1], "s"), + (UInt16[1], "S"), + (Int32[1], "i"), + (UInt32[1], "I"), + (Int64[1], "l"), + (UInt64[1], "L"), (Float32[1.0], "f"), (Float64[1.0], "g"), - (Bool[true], "b"), - (["hello"], "u"), - ([missing], "n"), + (Bool[true], "b"), + (["hello"], "u"), + ([missing], "n"), ] s_ref, a_ref = Arrow.to_c_data(to_arrow(input)) GC.@preserve s_ref a_ref begin @@ -61,9 +60,9 @@ data = Int32[10, 20, 30] s_ref, a_ref = Arrow.to_c_data(to_arrow(data)) arr = a_ref[] - @test arr.length == 3 + @test arr.length == 3 @test arr.null_count == 0 - @test arr.offset == 0 + @test arr.offset == 0 @test arr.n_buffers == 2 @test arr.n_children == 0 # validity buffer should be C_NULL (no nulls) @@ -115,7 +114,7 @@ @test unsafe_string(sch.format) == "+s" @test sch.n_children == 2 @test arr.n_children == 2 - @test arr.n_buffers == 1 + @test arr.n_buffers == 1 # child 0: x field c0_sch = unsafe_load(unsafe_load(sch.children)) @test unsafe_string(c0_sch.format) == "i" @@ -129,8 +128,12 @@ @test haskey(Arrow._EXPORT_ROOTS, token) # Simulate C calling release on the array - ccall(arr.release, Cvoid, (Ptr{Arrow.ArrowArray},), - Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)) + ccall( + arr.release, + Cvoid, + (Ptr{Arrow.ArrowArray},), + Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref), + ) # Token should be removed @test !haskey(Arrow._EXPORT_ROOTS, token) @@ -144,7 +147,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data end @@ -155,7 +158,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) end @@ -166,7 +169,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data end @@ -177,7 +180,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) end @@ -188,7 +191,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data end @@ -199,7 +202,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) end @@ -210,7 +213,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); convert=false, ) @test collect(imported) == collect(av) @@ -222,7 +225,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)); convert=false, ) @test collect(imported) == collect(av) @@ -234,7 +237,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) result = collect(imported) @test length(result) == 2 @@ -250,7 +253,7 @@ s_ref, a_ref = Arrow.to_c_data(av) imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == ["a", "b", "a", "c", "b"] end @@ -262,7 +265,7 @@ @test unsafe_string(s_ref[].format) == "n" imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test length(imported) == 5 @test all(ismissing, imported) @@ -273,34 +276,38 @@ data = Int32[99, 99, 1, 2, 3] # logical elements start at index 3 buf_ptrs = Ptr{Cvoid}[C_NULL, Ptr{Cvoid}(pointer(data))] - arr_ref = Ref(Arrow.ArrowArray( - Int64(3), # length = 3 - Int64(0), # null_count = 0 - Int64(2), # offset = 2 - Int64(2), # n_buffers = 2 - Int64(0), # n_children = 0 - Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), - Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), - Ptr{Arrow.ArrowArray}(C_NULL), - Ptr{Cvoid}(C_NULL), # no release needed (Julia-owned data) - Ptr{Cvoid}(C_NULL), - )) + arr_ref = Ref( + Arrow.ArrowArray( + Int64(3), # length = 3 + Int64(0), # null_count = 0 + Int64(2), # offset = 2 + Int64(2), # n_buffers = 2 + Int64(0), # n_children = 0 + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), + Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), # no release needed (Julia-owned data) + Ptr{Cvoid}(C_NULL), + ), + ) fmt_bytes = Vector{UInt8}("i\0") - sch_ref = Ref(Arrow.ArrowSchema( - Cstring(pointer(fmt_bytes)), - Cstring(C_NULL), - Cstring(C_NULL), - Int64(0), - Int64(0), - Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), - Ptr{Arrow.ArrowSchema}(C_NULL), - Ptr{Cvoid}(C_NULL), - Ptr{Cvoid}(C_NULL), - )) + sch_ref = Ref( + Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), + Cstring(C_NULL), + Cstring(C_NULL), + Int64(0), + Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), + Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + ), + ) GC.@preserve data buf_ptrs fmt_bytes arr_ref sch_ref begin imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, sch_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), ) @test collect(imported) == Int32[1, 2, 3] end @@ -309,23 +316,38 @@ @testset "import: C_NULL validity with null_count=0" begin data = Int32[10, 20, 30] buf_ptrs = Ptr{Cvoid}[C_NULL, Ptr{Cvoid}(pointer(data))] - arr_ref = Ref(Arrow.ArrowArray( - Int64(3), Int64(0), Int64(0), Int64(2), Int64(0), - Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), - Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), Ptr{Arrow.ArrowArray}(C_NULL), - Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), - )) + arr_ref = Ref( + Arrow.ArrowArray( + Int64(3), + Int64(0), + Int64(0), + Int64(2), + Int64(0), + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), + Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + ), + ) fmt_bytes = Vector{UInt8}("i\0") - sch_ref = Ref(Arrow.ArrowSchema( - Cstring(pointer(fmt_bytes)), Cstring(C_NULL), Cstring(C_NULL), - Int64(0), Int64(0), - Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), Ptr{Arrow.ArrowSchema}(C_NULL), - Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), - )) + sch_ref = Ref( + Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), + Cstring(C_NULL), + Cstring(C_NULL), + Int64(0), + Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), + Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + ), + ) GC.@preserve data buf_ptrs fmt_bytes arr_ref sch_ref begin imported = Arrow.from_c_data( Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, sch_ref)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), ) @test collect(imported) == Int32[10, 20, 30] end @@ -336,7 +358,8 @@ av = to_arrow(data) # Manually create a Primitive with metadata meta = Base.ImmutableDict("key1" => "val1", "key2" => "val2") - av_meta = Arrow.Primitive(eltype(av), av.arrow, av.validity, av.data, length(av), meta) + av_meta = + Arrow.Primitive(eltype(av), av.arrow, av.validity, av.data, length(av), meta) s_ref, a_ref = Arrow.to_c_data(av_meta) sch = s_ref[] @test sch.metadata != C_NULL @@ -352,10 +375,14 @@ s1, a1 = Arrow.to_c_data(col1; name="x") s2, a2 = Arrow.to_c_data(col2; name="y") tbl = Arrow.from_c_data( - [Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s1)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s2))], - [Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a1)), - Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a2))], + [ + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s1)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowSchema}, s2)), + ], + [ + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a1)), + Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a2)), + ], ) @test Tables.columnnames(tbl) == [:x, :y] @test collect(Tables.getcolumn(tbl, :x)) == Int32[1, 2, 3] @@ -572,23 +599,37 @@ @testset "import: Bool with non-byte-aligned offset=3" begin # Packed byte: 0b10110110 (LSB first) # bits 3,4,5,6,7 → 0,1,1,0,1 (reading from bit-position 3) - bools = UInt8[0b10110110] + bools = UInt8[0b10110110] validity = UInt8[0xff] buf_ptrs = Ptr{Cvoid}[Ptr{Cvoid}(pointer(validity)), Ptr{Cvoid}(pointer(bools))] - arr_ref = Ref(Arrow.ArrowArray( - Int64(5), Int64(0), Int64(3), Int64(2), Int64(0), - Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), - Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), - Ptr{Arrow.ArrowArray}(C_NULL), - Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), - )) + arr_ref = Ref( + Arrow.ArrowArray( + Int64(5), + Int64(0), + Int64(3), + Int64(2), + Int64(0), + Ptr{Ptr{Cvoid}}(pointer(buf_ptrs)), + Ptr{Ptr{Arrow.ArrowArray}}(C_NULL), + Ptr{Arrow.ArrowArray}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + ), + ) fmt_bytes = Vector{UInt8}("b\0") - sch_ref = Ref(Arrow.ArrowSchema( - Cstring(pointer(fmt_bytes)), Cstring(C_NULL), Cstring(C_NULL), - Int64(0), Int64(0), - Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), Ptr{Arrow.ArrowSchema}(C_NULL), - Ptr{Cvoid}(C_NULL), Ptr{Cvoid}(C_NULL), - )) + sch_ref = Ref( + Arrow.ArrowSchema( + Cstring(pointer(fmt_bytes)), + Cstring(C_NULL), + Cstring(C_NULL), + Int64(0), + Int64(0), + Ptr{Ptr{Arrow.ArrowSchema}}(C_NULL), + Ptr{Arrow.ArrowSchema}(C_NULL), + Ptr{Cvoid}(C_NULL), + Ptr{Cvoid}(C_NULL), + ), + ) GC.@preserve bools validity buf_ptrs arr_ref sch_ref begin imported = Arrow.from_c_data(_cptr(sch_ref), _cptr(arr_ref)) @test collect(imported) == [false, true, true, false, true] @@ -610,5 +651,4 @@ @test_nowarn Arrow.release_c_data(tbl) # second call must not throw end end - end # @testset "Arrow C Data Interface" From 36829d9374383ad2032d6b30badee18887a1a2b5 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Thu, 21 May 2026 10:02:27 +0000 Subject: [PATCH 07/10] Add finalizer to CDataHandle to detect missing release_c_data calls Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/cdatainterface.jl | 22 ++++++++++++-- test/cdatainterface.jl | 68 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 1a5c9fec..56a86ba7 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -75,8 +75,10 @@ const CDATA_FLAG_MAP_KEYS_SORTED = Int64(4) """ CDataHandle -Holds C-side pointers for an imported Arrow C Data Interface pair and calls -the C `release` callbacks when the Julia wrapper is garbage collected. +Holds C-side pointers for an imported Arrow C Data Interface pair. +Call `Arrow.release_c_data` to release C resources explicitly. +If the handle is garbage-collected without being released, an error is logged +and `Arrow.UNRELEASED_HANDLE_COUNT` is incremented. """ mutable struct CDataHandle schema_ptr::Ptr{ArrowSchema} @@ -84,7 +86,21 @@ mutable struct CDataHandle released::Bool end -CDataHandle(sp::Ptr{ArrowSchema}, ap::Ptr{ArrowArray}) = CDataHandle(sp, ap, false) +# Counts CDataHandles that were GC'd without an explicit release_c_data call. +const UNRELEASED_HANDLE_COUNT = Threads.Atomic{Int}(0) + +function _warn_unreleased(h::CDataHandle) + h.released && return + Threads.atomic_add!(UNRELEASED_HANDLE_COUNT, 1) + ccall(:jl_safe_printf, Cvoid, (Cstring,), + "Arrow.CDataHandle GC'd without explicit release_c_data — resource leak detected\n") +end + +function CDataHandle(sp::Ptr{ArrowSchema}, ap::Ptr{ArrowArray}) + h = CDataHandle(sp, ap, false) + finalizer(_warn_unreleased, h) + return h +end function _release_cdata_handle(h::CDataHandle) h.released && return diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl index ce25f949..c2f148e5 100644 --- a/test/cdatainterface.jl +++ b/test/cdatainterface.jl @@ -651,4 +651,72 @@ @test_nowarn Arrow.release_c_data(tbl) # second call must not throw end end + + # ── Finalizer / leak counter ────────────────────────────────────────────── + + @testset "finalizer: increments leak counter when not released" begin + before = Arrow.UNRELEASED_HANDLE_COUNT[] + let + av = to_arrow(Int32[1, 2, 3]) + s_ref, a_ref = Arrow.to_c_data(av) + Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + # CImportedArray and its CDataHandle go out of scope here + end + GC.gc(true) + GC.gc(true) + @test Arrow.UNRELEASED_HANDLE_COUNT[] == before + 1 + end + + @testset "finalizer: does NOT increment counter when released explicitly" begin + before = Arrow.UNRELEASED_HANDLE_COUNT[] + let + av = to_arrow(Int32[1, 2, 3]) + s_ref, a_ref = Arrow.to_c_data(av) + col = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + Arrow.release_c_data(col) + end + GC.gc(true) + GC.gc(true) + @test Arrow.UNRELEASED_HANDLE_COUNT[] == before + end + + # ── Empty arrays ───────────────────────────────────────────────────────── + + @testset "round-trip: empty Int32 array" begin + data = Int32[] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test a_ref[].length == 0 + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == Int32[] + end + + @testset "round-trip: empty String array" begin + data = String[] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test a_ref[].length == 0 + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == String[] + end + + # ── Large arrays ───────────────────────────────────────────────────────── + + @testset "round-trip: large Int32 array (1M rows)" begin + data = rand(Int32, 1_000_000) + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test a_ref[].length == 1_000_000 + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test collect(imported) == data + end + + @testset "round-trip: large nullable Float64 array (1M rows)" begin + data = Union{Float64,Missing}[isodd(i) ? Float64(i) : missing for i = 1:1_000_000] + av = to_arrow(data) + s_ref, a_ref = Arrow.to_c_data(av) + @test a_ref[].length == 1_000_000 + imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + @test isequal(collect(imported), data) + end end # @testset "Arrow C Data Interface" From d3b6e7348f26119c6edfa843dd6b5112e3eaf49b Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Thu, 21 May 2026 12:13:46 +0000 Subject: [PATCH 08/10] release handle in test --- src/cdatainterface.jl | 2 ++ test/cdatainterface.jl | 61 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 56a86ba7..9603de2d 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -92,8 +92,10 @@ const UNRELEASED_HANDLE_COUNT = Threads.Atomic{Int}(0) function _warn_unreleased(h::CDataHandle) h.released && return Threads.atomic_add!(UNRELEASED_HANDLE_COUNT, 1) + # Use jl_safe_printf since task switches are forbidden in finalizers. ccall(:jl_safe_printf, Cvoid, (Cstring,), "Arrow.CDataHandle GC'd without explicit release_c_data — resource leak detected\n") + _release_cdata_handle(h) end function CDataHandle(sp::Ptr{ArrowSchema}, ap::Ptr{ArrowArray}) diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl index c2f148e5..b4d1f00f 100644 --- a/test/cdatainterface.jl +++ b/test/cdatainterface.jl @@ -15,6 +15,8 @@ # limitations under the License. @testset "Arrow C Data Interface" begin + _initial_count = Arrow.UNRELEASED_HANDLE_COUNT[] + @testset "struct sizes" begin @test sizeof(Arrow.ArrowSchema) == 9 * 8 @test sizeof(Arrow.ArrowArray) == 10 * 8 @@ -150,6 +152,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data + Arrow.release_c_data(imported) end @testset "round-trip: Float64 with missing" begin @@ -161,6 +164,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) + Arrow.release_c_data(imported) end @testset "round-trip: Bool" begin @@ -172,6 +176,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data + Arrow.release_c_data(imported) end @testset "round-trip: Bool with missing" begin @@ -183,6 +188,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) + Arrow.release_c_data(imported) end @testset "round-trip: String" begin @@ -194,6 +200,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == data + Arrow.release_c_data(imported) end @testset "round-trip: String with missing" begin @@ -205,6 +212,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test isequal(collect(imported), data) + Arrow.release_c_data(imported) end @testset "round-trip: Date" begin @@ -217,6 +225,7 @@ convert=false, ) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: Timestamp" begin @@ -229,6 +238,7 @@ convert=false, ) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: struct" begin @@ -245,6 +255,7 @@ @test result[1].y == "a" @test result[2].x == Int32(2) @test result[2].y == "b" + Arrow.release_c_data(imported) end @testset "round-trip: dict encoded" begin @@ -256,6 +267,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, a_ref)), ) @test collect(imported) == ["a", "b", "a", "c", "b"] + Arrow.release_c_data(imported) end @testset "round-trip: null array" begin @@ -269,6 +281,7 @@ ) @test length(imported) == 5 @test all(ismissing, imported) + Arrow.release_c_data(imported) end @testset "import: non-zero offset" begin @@ -310,6 +323,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), ) @test collect(imported) == Int32[1, 2, 3] + Arrow.release_c_data(imported) end end @@ -350,6 +364,7 @@ Ptr{Cvoid}(Base.unsafe_convert(Ptr{Arrow.ArrowArray}, arr_ref)), ) @test collect(imported) == Int32[10, 20, 30] + Arrow.release_c_data(imported) end end @@ -387,6 +402,7 @@ @test Tables.columnnames(tbl) == [:x, :y] @test collect(Tables.getcolumn(tbl, :x)) == Int32[1, 2, 3] @test collect(Tables.getcolumn(tbl, :y)) == ["a", "b", "c"] + Arrow.release_c_data(tbl) end # Helper: convert a Ref to a Ptr{Cvoid} for from_c_data @@ -404,6 +420,7 @@ @test unsafe_string(s_ref[].format) == "+l" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: list of Int32 with missing (+l)" begin @@ -413,6 +430,7 @@ @test unsafe_string(s_ref[].format) == "+l" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test isequal(collect(imported), collect(av)) + Arrow.release_c_data(imported) end @testset "round-trip: list of String (+l)" begin @@ -422,6 +440,7 @@ @test unsafe_string(s_ref[].format) == "+l" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Fixed-size list ────────────────────────────────────────────────────── @@ -433,6 +452,7 @@ @test unsafe_string(s_ref[].format) == "+w:2" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: fixed-size list NTuple{3,Int64} (+w:3)" begin @@ -442,6 +462,7 @@ @test unsafe_string(s_ref[].format) == "+w:3" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Map ────────────────────────────────────────────────────────────────── @@ -453,6 +474,7 @@ @test unsafe_string(s_ref[].format) == "+m" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Unions ─────────────────────────────────────────────────────────────── @@ -464,6 +486,7 @@ @test startswith(unsafe_string(s_ref[].format), "+ud:") imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: sparse union Union{Int32,Float64} (+us:)" begin @@ -473,6 +496,7 @@ @test startswith(unsafe_string(s_ref[].format), "+us:") imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Time-of-day ────────────────────────────────────────────────────────── @@ -484,6 +508,7 @@ @test unsafe_string(s_ref[].format) == "ttn" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Duration ───────────────────────────────────────────────────────────── @@ -495,6 +520,7 @@ @test unsafe_string(s_ref[].format) == "tDs" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: Duration milliseconds (tDm)" begin @@ -504,6 +530,7 @@ @test unsafe_string(s_ref[].format) == "tDm" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: Duration microseconds (tDu)" begin @@ -513,6 +540,7 @@ @test unsafe_string(s_ref[].format) == "tDu" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: Duration nanoseconds (tDn)" begin @@ -522,6 +550,7 @@ @test unsafe_string(s_ref[].format) == "tDn" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Timestamp with timezone ─────────────────────────────────────────────── @@ -537,6 +566,7 @@ @test startswith(fmt, "ts") && endswith(fmt, ":UTC") imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Interval ───────────────────────────────────────────────────────────── @@ -550,6 +580,7 @@ @test unsafe_string(s_ref[].format) == "tiM" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end @testset "round-trip: Interval day-time (tiD)" begin @@ -561,6 +592,7 @@ @test unsafe_string(s_ref[].format) == "tiD" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Decimal ─────────────────────────────────────────────────────────────── @@ -573,6 +605,7 @@ @test unsafe_string(s_ref[].format) == "d:10,2,128" imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref); convert=false) @test collect(imported) == collect(av) + Arrow.release_c_data(imported) end # ── Table export ────────────────────────────────────────────────────────── @@ -591,6 +624,7 @@ @test Tables.columnnames(tbl2) == [:x, :y] @test collect(Tables.getcolumn(tbl2, :x)) == Int32[1, 2, 3] @test collect(Tables.getcolumn(tbl2, :y)) == ["a", "b", "c"] + Arrow.release_c_data(tbl2) end end @@ -633,6 +667,7 @@ GC.@preserve bools validity buf_ptrs arr_ref sch_ref begin imported = Arrow.from_c_data(_cptr(sch_ref), _cptr(arr_ref)) @test collect(imported) == [false, true, true, false, true] + Arrow.release_c_data(imported) end end @@ -656,14 +691,16 @@ @testset "finalizer: increments leak counter when not released" begin before = Arrow.UNRELEASED_HANDLE_COUNT[] - let - av = to_arrow(Int32[1, 2, 3]) - s_ref, a_ref = Arrow.to_c_data(av) - Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) - # CImportedArray and its CDataHandle go out of scope here + redirect_stderr(devnull) do + let + av = to_arrow(Int32[1, 2, 3]) + s_ref, a_ref = Arrow.to_c_data(av) + Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) + # CImportedArray and its CDataHandle go out of scope here + end + GC.gc(true) + GC.gc(true) end - GC.gc(true) - GC.gc(true) @test Arrow.UNRELEASED_HANDLE_COUNT[] == before + 1 end @@ -689,6 +726,7 @@ @test a_ref[].length == 0 imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == Int32[] + Arrow.release_c_data(imported) end @testset "round-trip: empty String array" begin @@ -698,6 +736,7 @@ @test a_ref[].length == 0 imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == String[] + Arrow.release_c_data(imported) end # ── Large arrays ───────────────────────────────────────────────────────── @@ -709,6 +748,7 @@ @test a_ref[].length == 1_000_000 imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test collect(imported) == data + Arrow.release_c_data(imported) end @testset "round-trip: large nullable Float64 array (1M rows)" begin @@ -718,5 +758,12 @@ @test a_ref[].length == 1_000_000 imported = Arrow.from_c_data(_cptr(s_ref), _cptr(a_ref)) @test isequal(collect(imported), data) + Arrow.release_c_data(imported) + end + + @testset "no unexpected resource leaks" begin + GC.gc(true) + GC.gc(true) + @test Arrow.UNRELEASED_HANDLE_COUNT[] == _initial_count + 1 # +1 for the intentional leak test end end # @testset "Arrow C Data Interface" From b700090c562f5d73af0d03fd078276cbd7b2776d Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Thu, 21 May 2026 13:01:56 +0000 Subject: [PATCH 09/10] Verify ArrowSchema/ArrowArray field offsets against C compiler Compile a C probe at test time using offsetof() and compare each field offset to Julia's fieldoffset(), confirming ABI compatibility with the Arrow C Data Interface struct layout. Also release CDataHandle in the finalizer and add release_c_data calls to all tests to prevent resource leaks. Co-Authored-By: Claude Sonnet 4.6 --- src/cdatainterface.jl | 8 +++-- test/cdatainterface.jl | 69 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 9603de2d..3f376856 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -93,8 +93,12 @@ function _warn_unreleased(h::CDataHandle) h.released && return Threads.atomic_add!(UNRELEASED_HANDLE_COUNT, 1) # Use jl_safe_printf since task switches are forbidden in finalizers. - ccall(:jl_safe_printf, Cvoid, (Cstring,), - "Arrow.CDataHandle GC'd without explicit release_c_data — resource leak detected\n") + ccall( + :jl_safe_printf, + Cvoid, + (Cstring,), + "Arrow.CDataHandle GC'd without explicit release_c_data — resource leak detected\n", + ) _release_cdata_handle(h) end diff --git a/test/cdatainterface.jl b/test/cdatainterface.jl index b4d1f00f..319f4366 100644 --- a/test/cdatainterface.jl +++ b/test/cdatainterface.jl @@ -22,6 +22,75 @@ @test sizeof(Arrow.ArrowArray) == 10 * 8 end + @testset "struct field offsets match C ABI" begin + c_src = """ + #include + #include + #include + struct ArrowSchema { + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + void (*release)(struct ArrowSchema*); + void* private_data; + }; + struct ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + void (*release)(struct ArrowArray*); + void* private_data; + }; + int main() { + printf("%zu %zu %zu %zu %zu %zu %zu %zu %zu\\n", + offsetof(struct ArrowSchema, format), + offsetof(struct ArrowSchema, name), + offsetof(struct ArrowSchema, metadata), + offsetof(struct ArrowSchema, flags), + offsetof(struct ArrowSchema, n_children), + offsetof(struct ArrowSchema, children), + offsetof(struct ArrowSchema, dictionary), + offsetof(struct ArrowSchema, release), + offsetof(struct ArrowSchema, private_data)); + printf("%zu %zu %zu %zu %zu %zu %zu %zu %zu %zu\\n", + offsetof(struct ArrowArray, length), + offsetof(struct ArrowArray, null_count), + offsetof(struct ArrowArray, offset), + offsetof(struct ArrowArray, n_buffers), + offsetof(struct ArrowArray, n_children), + offsetof(struct ArrowArray, buffers), + offsetof(struct ArrowArray, children), + offsetof(struct ArrowArray, dictionary), + offsetof(struct ArrowArray, release), + offsetof(struct ArrowArray, private_data)); + } + """ + mktempdir() do d + src = joinpath(d, "probe.c") + bin = joinpath(d, "probe") + write(src, c_src) + run(`cc -o $bin $src`) + lines = split(readchomp(`$bin`), '\n') + schema_offsets = parse.(Int, split(lines[1])) + array_offsets = parse.(Int, split(lines[2])) + for (i, off) in enumerate(schema_offsets) + @test fieldoffset(Arrow.ArrowSchema, i) == off + end + for (i, off) in enumerate(array_offsets) + @test fieldoffset(Arrow.ArrowArray, i) == off + end + end + end + # Helper: convert a Julia array to ArrowVector for export function to_arrow(x) return Arrow.toarrowvector(x) From d7ad6b6c9f8df8eea94bd4b024be36dff3595496 Mon Sep 17 00:00:00 2001 From: Robert Buessow Date: Mon, 25 May 2026 14:47:03 +0000 Subject: [PATCH 10/10] Guard CImportedArray getindex against use-after-release Co-Authored-By: Claude Sonnet 4.6 --- src/cdatainterface.jl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cdatainterface.jl b/src/cdatainterface.jl index 3f376856..b2c35164 100644 --- a/src/cdatainterface.jl +++ b/src/cdatainterface.jl @@ -141,7 +141,10 @@ end Base.size(x::CImportedArray) = size(x.data) Base.IndexStyle(::Type{<:CImportedArray}) = Base.IndexLinear() Base.@propagate_inbounds function Base.getindex(x::CImportedArray, i::Integer) - @boundscheck checkbounds(x, i) + @boundscheck begin + x.handle.released && throw(ArgumentError("CImportedArray accessed after release_c_data")) + checkbounds(x, i) + end return @inbounds x.data[i] end