Skip to content
Permalink
Browse files
Overhaul type serialization/deserialization machinery (#156)
* Start work on overhauling type serialization architecture

* More work; serialization is pretty much done but not tested

* fix timetype ArrowTypes definitions

* more work to get tests passing

* get tests passing?

* fix

* Fix #75 by supporting Set serialization/deserialization

* Fix #85 by supporting tuple serialization/deserialization

* Lots of cleanup

* few more fixes

* Update src/arrowtypes.jl

Co-authored-by: Jarrett Revels <jarrettrevels@gmail.com>

* Update src/arrowtypes.jl

Co-authored-by: Jarrett Revels <jarrettrevels@gmail.com>

* fix NullKind reading

* Fix #134 by requiring concrete or union of concrete element types for
all columns when serializing

* Add new ArrowTypes.arrowmetadata method for providing additional extension type metadata htat can be used in JuliaType

* Update manual

* tests

Co-authored-by: Jarrett Revels <jarrettrevels@gmail.com>
  • Loading branch information
quinnj and jrevels committed Mar 29, 2021
1 parent 2cacbe5 commit ff53d1359c01ae3e98fa3723f9f994c9ba420050
Show file tree
Hide file tree
Showing 19 changed files with 611 additions and 252 deletions.

Large diffs are not rendered by default.

@@ -45,6 +45,8 @@ using Mmap
import Dates
using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers

export ArrowTypes

using Base: @propagate_inbounds
import Base: ==

@@ -52,58 +52,57 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict
if nl > maxdepth
error("reached nested serialization level ($nl) deeper than provided max depth argument ($(maxdepth)); to increase allowed nesting level, pass `maxdepth=X`")
end
if !(x isa DictEncode) && !dictencoding && (dictencode || (x isa AbstractArray && DataAPI.refarray(x) !== x))
T = maybemissing(eltype(x))
if !(x isa DictEncode) && !dictencoding && (dictencode || DataAPI.refarray(x) !== x)
x = DictEncode(x, dictencodeid(i, nl, fi))
elseif x isa DictEncoded
return arrowvector(DictEncodeType, x, i, nl, fi, de, ded, meta; dictencode=dictencode, kw...)
elseif !(x isa DictEncode)
x = ToArrow(x)
end
S = maybemissing(eltype(x))
if ArrowTypes.hasarrowname(T)
meta = meta === nothing ? Dict{String, String}() : meta
meta["ARROW:extension:name"] = String(ArrowTypes.arrowname(T))
meta["ARROW:extension:metadata"] = String(ArrowTypes.arrowmetadata(T))
end
return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, kw...)
end

# defaults for Dates types
ArrowTypes.default(::Type{Dates.Date}) = Dates.Date(1,1,1)
ArrowTypes.default(::Type{Dates.Time}) = Dates.Time(1,1,1)
ArrowTypes.default(::Type{Dates.DateTime}) = Dates.DateTime(1,1,1,1,1,1)
ArrowTypes.default(::Type{TimeZones.ZonedDateTime}) = TimeZones.ZonedDateTime(1,1,1,1,1,1,TimeZones.tz"UTC")

# conversions to arrow types
arrowvector(::Type{Dates.Date}, x, i, nl, fi, de, ded, meta; kw...) =
arrowvector(converter(DATE, x), i, nl, fi, de, ded, meta; kw...)
arrowvector(::Type{Dates.Time}, x, i, nl, fi, de, ded, meta; kw...) =
arrowvector(converter(TIME, x), i, nl, fi, de, ded, meta; kw...)
arrowvector(::Type{Dates.DateTime}, x, i, nl, fi, de, ded, meta; kw...) =
arrowvector(converter(DATETIME, x), i, nl, fi, de, ded, meta; kw...)
arrowvector(::Type{ZonedDateTime}, x, i, nl, fi, de, ded, meta; kw...) =
arrowvector(converter(Timestamp{Meta.TimeUnit.MILLISECOND, Symbol(x[1].timezone)}, x), i, nl, fi, de, ded, meta; kw...)
arrowvector(::Type{P}, x, i, nl, fi, de, ded, meta; kw...) where {P <: Dates.Period} =
arrowvector(converter(Duration{arrowperiodtype(P)}, x), i, nl, fi, de, ded, meta; kw...)

# fallback that calls ArrowType
# now we check for ArrowType converions and dispatch on ArrowKind
function arrowvector(::Type{S}, x, i, nl, fi, de, ded, meta; kw...) where {S}
# deprecated and will be removed
if ArrowTypes.istyperegistered(S)
meta = meta === nothing ? Dict{String, String}() : meta
arrowtype = ArrowTypes.getarrowtype!(meta, S)
if arrowtype === S
return arrowvector(ArrowType(S), x, i, nl, fi, de, ded, meta; kw...)
return arrowvector(ArrowKind(S), x, i, nl, fi, de, ded, meta; kw...)
else
return arrowvector(converter(arrowtype, x), i, nl, fi, de, ded, meta; kw...)
end
end
return arrowvector(ArrowType(S), x, i, nl, fi, de, ded, meta; kw...)
# end deprecation
return arrowvector(ArrowKind(S), x, i, nl, fi, de, ded, meta; kw...)
end

struct NullVector{T} <: ArrowVector{T}
data::MissingVector
metadata::Union{Nothing, Dict{String, String}}
end
Base.size(v::NullVector) = (length(v.data),)
Base.getindex(v::NullVector{T}, i::Int) where {T} = ArrowTypes.fromarrow(T, getindex(v.data, i))

arrowvector(::NullType, x, i, nl, fi, de, ded, meta; kw...) = MissingVector(length(x))
compress(Z::Meta.CompressionType, comp, v::MissingVector) =
Compressed{Z, MissingVector}(v, CompressedBuffer[], length(v), length(v), Compressed[])
arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), meta)
compress(Z::Meta.CompressionType, comp, v::NullVector) =
Compressed{Z, NullVector}(v, CompressedBuffer[], length(v), length(v), Compressed[])

function makenodesbuffers!(col::MissingVector, fieldnodes, fieldbuffers, bufferoffset, alignment)
function makenodesbuffers!(col::NullVector, fieldnodes, fieldbuffers, bufferoffset, alignment)
push!(fieldnodes, FieldNode(length(col), length(col)))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
return bufferoffset
end

function writebuffer(io, col::MissingVector, alignment)
function writebuffer(io, col::NullVector, alignment)
return
end

@@ -114,7 +113,7 @@ A bit-packed array type where each bit corresponds to an element in an
[`ArrowVector`](@ref), indicating whether that element is "valid" (bit == 1),
or not (bit == 0). Used to indicate element missingness (whether it's null).
If the null count of an array is zero, the `ValidityBitmap` will be "emtpy"
If the null count of an array is zero, the `ValidityBitmap` will be "empty"
and all elements are treated as "valid"/non-null.
"""
struct ValidityBitmap <: ArrowVector{Bool}
@@ -38,7 +38,7 @@ Base.size(p::BoolVector) = (p.ℓ,)
a, b = fldmod1(i, 8)
@inbounds byte = p.arrow[p.pos + a - 1]
# check individual bit of byte
return getbit(byte, b)
return ArrowTypes.fromarrow(T, getbit(byte, b))
end

@propagate_inbounds function Base.setindex!(p::BoolVector, v, i::Integer)
@@ -50,9 +50,9 @@ end
return v
end

arrowvector(::BoolType, x::BoolVector, i, nl, fi, de, ded, meta; kw...) = x
arrowvector(::BoolKind, x::BoolVector, i, nl, fi, de, ded, meta; kw...) = x

function arrowvector(::BoolType, x, i, nl, fi, de, ded, meta; kw...)
function arrowvector(::BoolKind, x, i, nl, fi, de, ded, meta; kw...)
validity = ValidityBitmap(x)
len = length(x)
blen = cld(len, 8)
@@ -71,7 +71,7 @@ Base.IndexStyle(::Type{<:DictEncode}) = Base.IndexLinear()
Base.size(x::DictEncode) = (length(x.data),)
Base.iterate(x::DictEncode, st...) = iterate(x.data, st...)
Base.getindex(x::DictEncode, i::Int) = getindex(x.data, i)
ArrowTypes.ArrowType(::Type{<:DictEncodeType}) = DictEncodedType()
ArrowTypes.ArrowKind(::Type{<:DictEncodeType}) = DictEncodedKind()
Base.copy(x::DictEncode) = DictEncode(x.data, x.id)

"""
@@ -122,7 +122,7 @@ dictencodeid(colidx, nestedlevel, fieldid) = (Int64(nestedlevel) << 48) | (Int64
getid(d::DictEncoded) = d.encoding.id
getid(c::Compressed{Z, A}) where {Z, A <: DictEncoded} = c.data.encoding.id

function arrowvector(::DictEncodedType, x::DictEncoded, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...)
function arrowvector(::DictEncodedKind, x::DictEncoded, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...)
id = x.encoding.id
if !haskey(de, id)
de[id] = Lockable(x.encoding)
@@ -152,7 +152,7 @@ function arrowvector(::DictEncodedType, x::DictEncoded, i, nl, fi, de, ded, meta
return x
end

function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...)
function arrowvector(::DictEncodedKind, x, i, nl, fi, de, ded, meta; dictencode::Bool=false, dictencodenested::Bool=false, kw...)
@assert x isa DictEncode
id = x.id == -1 ? dictencodeid(i, nl, fi) : x.id
x = x.data
@@ -31,19 +31,20 @@ Base.size(l::FixedSizeList) = (l.ℓ,)

@propagate_inbounds function Base.getindex(l::FixedSizeList{T}, i::Integer) where {T}
@boundscheck checkbounds(l, i)
X = Base.nonmissingtype(T)
S = Base.nonmissingtype(T)
X = ArrowTypes.ArrowKind(ArrowTypes.ArrowType(S))
N = ArrowTypes.getsize(X)
Y = ArrowTypes.gettype(X)
if X !== T && !(l.validity[i])
return missing
else
off = (i - 1) * N
if X === T && isbitstype(Y) && l.data isa Vector{UInt8}
tup = _unsafe_load_tuple(NTuple{N,Y}, l.data, off + 1)
if X === T && isbitstype(Y)
tup = _unsafe_load_tuple(NTuple{N, Y}, l.data, off + 1)
else
tup = ntuple(j->l.data[off + j], N)
end
return ArrowTypes.arrowconvert(T, tup)
return ArrowTypes.fromarrow(T, tup)
end
end

@@ -59,7 +60,7 @@ end
if v === missing
@inbounds l.validity[i] = false
else
N = ArrowTypes.getsize(Base.nonmissingtype(T))
N = ArrowTypes.getsize(ArrowTypes.ArrowKind(ArrowTypes.ArrowType(Base.nonmissingtype(T))))
off = (i - 1) * N
foreach(1:N) do j
@inbounds l.data[off + j] = v[j]
@@ -70,11 +71,13 @@ end

# lazy equal-spaced flattener
struct ToFixedSizeList{T, N, A} <: AbstractVector{T}
data::A # A is AbstractVector of AbstractVector or AbstractString
data::A # A is AbstractVector of (AbstractVector or AbstractString)
end

origtype(::ToFixedSizeList{T, N, A}) where {T, N, A} = eltype(A)

function ToFixedSizeList(input)
NT = Base.nonmissingtype(eltype(input)) # typically NTuple{N, T}
NT = ArrowTypes.ArrowKind(Base.nonmissingtype(eltype(input))) # typically NTuple{N, T}
return ToFixedSizeList{ArrowTypes.gettype(NT), ArrowTypes.getsize(NT), typeof(input)}(input)
end

@@ -102,18 +105,20 @@ end
return x, (i + 1, chunk, chunk_i, len)
end

arrowvector(::FixedSizeListType, x::FixedSizeList, i, nl, fi, de, ded, meta; kw...) = x
arrowvector(::FixedSizeListKind, x::FixedSizeList, i, nl, fi, de, ded, meta; kw...) = x

function arrowvector(::FixedSizeListType, x, i, nl, fi, de, ded, meta; kw...)
function arrowvector(::FixedSizeListKind{N, T}, x, i, nl, fi, de, ded, meta; kw...) where {N, T}
len = length(x)
validity = ValidityBitmap(x)
flat = ToFixedSizeList(x)
if eltype(flat) == UInt8
data = flat
S = origtype(flat)
else
data = arrowvector(flat, i, nl + 1, fi, de, ded, nothing; kw...)
S = withmissing(eltype(x), NTuple{N, eltype(data)})
end
return FixedSizeList{eltype(x), typeof(data)}(UInt8[], validity, data, len, meta)
return FixedSizeList{S, typeof(data)}(UInt8[], validity, data, len, meta)
end

function compress(Z::Meta.CompressionType, comp, x::FixedSizeList{T, A}) where {T, A}
@@ -141,7 +146,7 @@ function makenodesbuffers!(col::FixedSizeList{T, A}, fieldnodes, fieldbuffers, b
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += blen
if eltype(A) === UInt8
blen = ArrowTypes.getsize(Base.nonmissingtype(T)) * len
blen = ArrowTypes.getsize(ArrowTypes.ArrowKind(Base.nonmissingtype(T))) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
@@ -47,16 +47,18 @@ Base.size(l::List) = (l.ℓ,)
@propagate_inbounds function Base.getindex(l::List{T}, i::Integer) where {T}
@boundscheck checkbounds(l, i)
@inbounds lo, hi = l.offsets[i]
if ArrowTypes.isstringtype(T)
if Base.nonmissingtype(T) !== T
return l.validity[i] ? ArrowTypes.arrowconvert(T, unsafe_string(pointer(l.data, lo), hi - lo + 1)) : missing
S = Base.nonmissingtype(T)
K = ArrowTypes.ArrowKind(ArrowTypes.ArrowType(S))
if ArrowTypes.isstringtype(K)
if S !== T
return l.validity[i] ? ArrowTypes.fromarrow(T, pointer(l.data, lo), hi - lo + 1) : missing
else
return ArrowTypes.arrowconvert(T, unsafe_string(pointer(l.data, lo), hi - lo + 1))
return ArrowTypes.fromarrow(T, pointer(l.data, lo), hi - lo + 1)
end
elseif Base.nonmissingtype(T) !== T
return l.validity[i] ? ArrowTypes.arrowconvert(T, view(l.data, lo:hi)) : missing
elseif S !== T
return l.validity[i] ? ArrowTypes.fromarrow(T, view(l.data, lo:hi)) : missing
else
return ArrowTypes.arrowconvert(T, view(l.data, lo:hi))
return ArrowTypes.fromarrow(T, view(l.data, lo:hi))
end
end

@@ -71,10 +73,13 @@ struct ToList{T, stringtype, A, I} <: AbstractVector{T}
inds::Vector{I}
end

origtype(::ToList{T, S, A, I}) where {T, S, A, I} = A

function ToList(input; largelists::Bool=false)
AT = eltype(input)
ST = Base.nonmissingtype(AT)
stringtype = ArrowTypes.isstringtype(ST)
K = ArrowTypes.ArrowKind(ST)
stringtype = ArrowTypes.isstringtype(K)
T = stringtype ? UInt8 : eltype(ST)
len = stringtype ? ncodeunits : length
data = AT[]
@@ -104,7 +109,8 @@ Base.size(x::ToList) = (length(x.inds) == 0 ? 0 : x.inds[end],)

function Base.pointer(A::ToList{UInt8}, i::Integer)
chunk = searchsortedfirst(A.inds, i)
return pointer(A.data[chunk - 1])
chunk = chunk > length(A.inds) ? 1 : (chunk - 1)
return pointer(A.data[chunk])
end

@inline function index(A::ToList, i::Integer)
@@ -178,19 +184,21 @@ end
return x, (i, chunk, chunk_i, chunk_len, len)
end

arrowvector(::ListType, x::List, i, nl, fi, de, ded, meta; kw...) = x
arrowvector(::ListKind, x::List, i, nl, fi, de, ded, meta; kw...) = x

function arrowvector(::ListType, x, i, nl, fi, de, ded, meta; largelists::Bool=false, kw...)
function arrowvector(::ListKind, x, i, nl, fi, de, ded, meta; largelists::Bool=false, kw...)
len = length(x)
validity = ValidityBitmap(x)
flat = ToList(x; largelists=largelists)
offsets = Offsets(UInt8[], flat.inds)
if eltype(flat) == UInt8 # binary or utf8string
data = flat
T = origtype(flat)
else
data = arrowvector(flat, i, nl + 1, fi, de, ded, nothing; lareglists=largelists, kw...)
T = withmissing(eltype(x), Vector{eltype(data)})
end
return List{eltype(x), eltype(flat.inds), typeof(data)}(UInt8[], validity, offsets, data, len, meta)
return List{T, eltype(flat.inds), typeof(data)}(UInt8[], validity, offsets, data, len, meta)
end

function compress(Z::Meta.CompressionType, comp, x::List{T, O, A}) where {T, O, A}
@@ -33,18 +33,20 @@ Base.size(l::Map) = (l.ℓ,)
@boundscheck checkbounds(l, i)
@inbounds lo, hi = l.offsets[i]
if Base.nonmissingtype(T) !== T
return l.validity[i] ? ArrowTypes.arrowconvert(T, Dict(x.key => x.value for x in view(l.data, lo:hi))) : missing
return l.validity[i] ? ArrowTypes.fromarrow(T, Dict(x.key => x.value for x in view(l.data, lo:hi))) : missing
else
return ArrowTypes.arrowconvert(T, Dict(x.key => x.value for x in view(l.data, lo:hi)))
return ArrowTypes.fromarrow(T, Dict(x.key => x.value for x in view(l.data, lo:hi)))
end
end

keyvalues(KT, ::Missing) = missing
keyvalues(KT, x::AbstractDict) = [KT(k, v) for (k, v) in pairs(x)]

arrowvector(::MapType, x::Map, i, nl, fi, de, ded, meta; kw...) = x
keyvaluetypes(::Type{NamedTuple{(:key, :value), Tuple{K, V}}}) where {K, V} = (K, V)

function arrowvector(::MapType, x, i, nl, fi, de, ded, meta; largelists::Bool=false, kw...)
arrowvector(::MapKind, x::Map, i, nl, fi, de, ded, meta; kw...) = x

function arrowvector(::MapKind, x, i, nl, fi, de, ded, meta; largelists::Bool=false, kw...)
len = length(x)
validity = ValidityBitmap(x)
ET = eltype(x)
@@ -55,7 +57,8 @@ function arrowvector(::MapType, x, i, nl, fi, de, ded, meta; largelists::Bool=fa
flat = ToList(T[keyvalues(KT, y) for y in x]; largelists=largelists)
offsets = Offsets(UInt8[], flat.inds)
data = arrowvector(flat, i, nl + 1, fi, de, ded, nothing; lareglists=largelists, kw...)
return Map{ET, eltype(flat.inds), typeof(data)}(validity, offsets, data, len, meta)
K, V = keyvaluetypes(eltype(data))
return Map{withmissing(ET, Dict{K, V}), eltype(flat.inds), typeof(data)}(validity, offsets, data, len, meta)
end

function compress(Z::Meta.CompressionType, comp, x::A) where {A <: Map}
@@ -43,9 +43,9 @@ end
@propagate_inbounds function Base.getindex(p::Primitive{T}, i::Integer) where {T}
@boundscheck checkbounds(p, i)
if T >: Missing
return @inbounds (p.validity[i] ? ArrowTypes.arrowconvert(T, p.data[i]) : missing)
return @inbounds (p.validity[i] ? ArrowTypes.fromarrow(T, p.data[i]) : missing)
else
return @inbounds ArrowTypes.arrowconvert(T, p.data[i])
return @inbounds ArrowTypes.fromarrow(T, p.data[i])
end
end

@@ -63,9 +63,9 @@ end
return v
end

arrowvector(::PrimitiveType, x::Primitive, i, nl, fi, de, ded, meta; kw...) = x
arrowvector(::PrimitiveKind, x::Primitive, i, nl, fi, de, ded, meta; kw...) = x

function arrowvector(::PrimitiveType, x, i, nl, fi, de, ded, meta; kw...)
function arrowvector(::PrimitiveKind, x, i, nl, fi, de, ded, meta; kw...)
validity = ValidityBitmap(x)
return Primitive(eltype(x), UInt8[], validity, x, length(x), meta)
end

0 comments on commit ff53d13

Please sign in to comment.