Skip to content
Permalink
Browse files
Implement json apache arrow testing integration code (#18)
* Implement json apache arrow testing integration code

* Fix tests

* Try to fix pre-1.3
  • Loading branch information
quinnj committed Oct 3, 2020
1 parent 422c7a7 commit 68287613af5170323089b00aa49ace983a588561
Show file tree
Hide file tree
Showing 5 changed files with 704 additions and 20 deletions.
@@ -133,6 +133,8 @@ function arrowtype(b, ::Type{NTuple{N, UInt8}}) where {N}
end

default(::Type{NTuple{N, T}}) where {N, T} = ntuple(i -> default(T), N)
# arggh!
Base.write(io::IO, x::NTuple{N, T}) where {N, T} = sum(y -> Base.write(io, y), x)
default(::Type{T}) where {T <: Tuple} = Tuple(default(fieldtype(T, i)) for i = 1:fieldcount(T))

juliaeltype(f::Meta.Field, x::Meta.Bool) = Bool
@@ -143,10 +145,12 @@ function arrowtype(b, ::Type{Bool})
end

struct Decimal{P, S}
bytes::NTuple{16, UInt8}
value::Int128
end

Base.zero(::Type{Decimal{P, S}}) where {P, S} = Decimal{P, S}(ntuple(i->0x00, 16))
Base.zero(::Type{Decimal{P, S}}) where {P, S} = Decimal{P, S}(Int128(0))
==(a::Decimal{P, S}, b::Decimal{P, S}) where {P, S} = ==(a.value, b.value)
Base.isequal(a::Decimal{P, S}, b::Decimal{P, S}) where {P, S} = isequal(a.value, b.value)

function juliaeltype(f::Meta.Field, x::Meta.Decimal)
return Decimal{x.precision, x.scale}
@@ -159,6 +163,8 @@ function arrowtype(b, ::Type{Decimal{P, S}}) where {P, S}
return Meta.Decimal, Meta.decimalEnd(b), nothing
end

Base.write(io::IO, x::Decimal) = Base.write(io, x.value)

abstract type ArrowTimeType end
Base.write(io::IO, x::ArrowTimeType) = Base.write(io, x.x)

@@ -167,7 +173,7 @@ struct Date{U, T} <: ArrowTimeType
end

Base.zero(::Type{Date{U, T}}) where {U, T} = Date{U, T}(T(0))

storagetype(::Type{Date{U, T}}) where {U, T} = T
bitwidth(x::Meta.DateUnit) = x == Meta.DateUnit.DAY ? Int32 : Int64
Date{Meta.DateUnit.DAY}(days) = Date{Meta.DateUnit.DAY, Int32}(Int32(days))
Date{Meta.DateUnit.MILLISECOND}(ms) = Date{Meta.DateUnit.MILLISECOND, Int64}(Int64(ms))
@@ -198,7 +204,7 @@ Base.zero(::Type{Time{U, T}}) where {U, T} = Time{U, T}(T(0))

bitwidth(x::Meta.TimeUnit) = x == Meta.TimeUnit.SECOND || x == Meta.TimeUnit.MILLISECOND ? Int32 : Int64
Time{U}(x) where {U <: Meta.TimeUnit} = Time{U, bitwidth(U)}(bitwidth(U)(x))

storagetype(::Type{Time{U, T}}) where {U, T} = T
juliaeltype(f::Meta.Field, x::Meta.Time) = Time{x.unit, bitwidth(x.unit)}
finaljuliatype(::Type{<:Time}) = Dates.Time
periodtype(U::Meta.TimeUnit) = U === Meta.TimeUnit.SECOND ? Dates.Second :
@@ -325,6 +331,8 @@ struct KeyValue{K, V}
key::K
value::V
end
keyvalueK(::Type{KeyValue{K, V}}) where {K, V} = K
keyvalueV(::Type{KeyValue{K, V}}) where {K, V} = V
Base.length(kv::KeyValue) = 1
Base.iterate(kv::KeyValue, st=1) = st === nothing ? nothing : (kv, nothing)
default(::Type{KeyValue{K, V}}) where {K, V} = KeyValue(default(K), default(V))
@@ -24,6 +24,11 @@ function writearray(io::IO, ::Type{T}, col) where {T}
elseif isbitstype(T) && (col isa Vector{Union{T, Missing}} || col isa SentinelVector{T, T, Missing, Vector{T}})
# need to write the non-selector bytes of isbits Union Arrays
n = Base.unsafe_write(io, pointer(col), sizeof(T) * length(col))
elseif col isa ChainedVector
n = 0
for A in col.arrays
n += writearray(io, T, A)
end
else
n = 0
for x in col
@@ -109,6 +114,12 @@ if !applicable(iterate, missing)
Base.iterate(::Missing, st=1) = st === nothing ? nothing : (missing, nothing)
end

ntupleT(::Type{NTuple{N, T}}) where {N, T} = T
ntnames(::Type{NamedTuple{names, T}}) where {names, T} = names
ntT(::Type{NamedTuple{names, T}}) where {names, T} = T
pairK(::Type{Pair{K, V}}) where {K, V} = K
pairV(::Type{Pair{K, V}}) where {K, V} = V

# need a custom representation of Union types since arrow unions
# are ordered, and possibly indirected via separate typeIds array
# here, T is Meta.UnionMode.Dense or Meta.UnionMode.Sparse,
@@ -236,7 +247,7 @@ struct Converter{T, A} <: AbstractVector{T}
end

converter(::Type{T}, x::A) where {T, A} = Converter{eltype(A) >: Missing ? Union{T, Missing} : T, A}(x)
converter(::Type{T}, x::ChainedVector{A}) where {T, A} = ChainedVector(Vector{A}[converter(T, x) for x in x.arrays])
converter(::Type{T}, x::ChainedVector{A}) where {T, A} = ChainedVector([converter(T, x) for x in x.arrays])

Base.IndexStyle(::Type{<:Converter}) = Base.IndexLinear()
Base.size(x::Converter) = (length(x.data),)
@@ -251,6 +262,10 @@ DataAPI.refpool(x::Converter{T}) where {T} = converter(T, DataAPI.refpool(x.data

maybemissing(::Type{T}) where {T} = T === Missing ? Missing : Base.nonmissingtype(T)

macro miss_or(x, ex)
esc(:($x === missing ? missing : $(ex)))
end

function getfooter(filebytes)
len = readbuffer(filebytes, length(filebytes) - 9, Int32)
FlatBuffers.getrootas(Meta.Footer, filebytes[end-(9 + len):end-10], 0)
@@ -34,11 +34,65 @@ else
parts(x::Tuple) = x
end

@static if VERSION >= v"1.3"
const Cond = Threads.Condition
else
const Cond = Condition
end

struct OrderedChannel{T}
chan::Channel{T}
cond::Cond
i::Ref{Int}
end

OrderedChannel{T}(sz) where {T} = OrderedChannel{T}(Channel{T}(sz), Threads.Condition(), Ref(1))
Base.iterate(ch::OrderedChannel, st...) = iterate(ch.chan, st...)

macro lock(obj, expr)
esc(quote
@static if VERSION >= v"1.3"
lock($obj)
end
try
$expr
finally
@static if VERSION >= v"1.3"
unlock($obj)
end
end
end)
end

function Base.put!(ch::OrderedChannel{T}, x::T, i::Integer, incr::Bool=false) where {T}
@lock ch.cond begin
while ch.i[] < i
wait(ch.cond)
end
put!(ch.chan, x)
if incr
ch.i[] += 1
end
notify(ch.cond)
end
return
end

function Base.close(ch::OrderedChannel)
@lock ch.cond begin
while Base.n_waiters(ch.cond) > 0
wait(ch.cond)
end
close(ch.chan)
end
return
end

function write(io, source, writetofile, debug)
if writetofile
Base.write(io, "ARROW1\0\0")
end
msgs = Channel{Message}(Inf)
msgs = OrderedChannel{Message}(Inf)
# build messages
sch = Ref{Tables.Schema}()
firstcols = Ref{Any}()
@@ -76,19 +130,19 @@ end
end
end
debug && @show sch[]
put!(msgs, makeschemamsg(sch[], cols, dictencodings))
put!(msgs, makeschemamsg(sch[], cols, dictencodings), i)
if !isempty(dictencodings)
for (colidx, (id, T, values)) in dictencodings
dictsch = Tables.Schema((:col,), (eltype(values),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=values,), id, false, debug))
put!(msgs, makedictionarybatchmsg(dictsch, (col=values,), id, false, debug), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug))
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug), i, true)
else
@static if VERSION >= v"1.3-DEV"
Threads.@spawn begin
try
cols = Tables.columns(tbl)
cols = Tables.columns(toarrowtable(tbl))
if !isempty(dictencodings)
for (colidx, (id, T, values)) in dictencodings
dictsch = Tables.Schema((:col,), (eltype(values),))
@@ -101,12 +155,12 @@ end
end
# get new values we haven't seen before for delta update
vals = setdiff(newvals, values)
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), id, true, debug))
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), id, true, debug), i)
# add new values to existing set for future diffs
union!(values, vals)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug))
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug), i, true)
catch e
showerror(stdout, e, catch_backtrace())
rethrow(e)
@@ -115,7 +169,7 @@ end
else
@async begin
try
cols = Tables.columns(tbl)
cols = Tables.columns(toarrowtable(tbl))
if !isempty(dictencodings)
for (colidx, (id, T, values)) in dictencodings
dictsch = Tables.Schema((:col,), (eltype(values),))
@@ -128,12 +182,12 @@ else
end
# get new values we haven't seen before for delta update
vals = setdiff(newvals, values)
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), id, true, debug))
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), id, true, debug), i)
# add new values to existing set for future diffs
union!(values, vals)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug))
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug), i, true)
catch e
showerror(stdout, e, catch_backtrace())
rethrow(e)
@@ -651,16 +705,16 @@ function makenodesbuffers!(::Type{KeyValue{K, V}}, col, fieldnodes, fieldbuffers
# keys
bufferoffset = makenodesbuffers!(maybemissing(K), (x.key for x in col), fieldnodes, fieldbuffers, bufferoffset)
# values
bufferoffset = makenodesbuffers!(maybemissing(V), (x.value for x in col), fieldnodes, fieldbuffers, bufferoffset)
bufferoffset = makenodesbuffers!(maybemissing(V), (@miss_or(x, x.value) for x in col), fieldnodes, fieldbuffers, bufferoffset)
return bufferoffset
end

function writebuffer(io, ::Type{KeyValue{K, V}}, col) where {K, V}
writebitmap(io, col)
# write keys
writebuffer(io, maybemissing(K),(x.key for x in col))
writebuffer(io, maybemissing(K), (x.key for x in col))
# write values
writebuffer(io, maybemissing(V),(x.value for x in col))
writebuffer(io, maybemissing(V), (@miss_or(x, x.value) for x in col))
return
end

@@ -673,7 +727,7 @@ function makenodesbuffers!(::Type{NamedTuple{names, types}}, col, fieldnodes, fi
push!(fieldbuffers, Buffer(bufferoffset, blen))
bufferoffset += blen
for i = 1:length(names)
bufferoffset = makenodesbuffers!(maybemissing(fieldtype(types, i)), (getfield(x, names[i]) for x in col), fieldnodes, fieldbuffers, bufferoffset)
bufferoffset = makenodesbuffers!(maybemissing(fieldtype(types, i)), (@miss_or(x, getfield(x, names[i])) for x in col), fieldnodes, fieldbuffers, bufferoffset)
end
return bufferoffset
end
@@ -682,7 +736,7 @@ function writebuffer(io, ::Type{NamedTuple{names, types}}, col) where {names, ty
writebitmap(io, col)
# write values arrays
for i = 1:length(names)
writebuffer(io, maybemissing(fieldtype(types, i)), (getfield(x, names[i]) for x in col))
writebuffer(io, maybemissing(fieldtype(types, i)), (@miss_or(x, getfield(x, names[i])) for x in col))
end
return
end

0 comments on commit 6828761

Please sign in to comment.