Skip to content
Permalink
Browse files
fixes for compat
  • Loading branch information
quinnj committed Oct 3, 2020
1 parent f33ed07 commit 15c06e5f6eb89d900d012159c88e2f032ec85110
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
@@ -29,6 +29,7 @@ if isdefined(Tables, :partitions)
parts = Tables.partitions
else
parts = x -> (x,)
parts(x::Tuple) = x
end

function write(io, source, writetofile, debug)
@@ -41,9 +42,15 @@ function write(io, source, writetofile, debug)
dictid = Ref(0)
dictencodings = Dict{Int, Tuple{Int, Type, Any}}()
# start message writing from channel
@static if VERSION >= v"1.3-DEV"
tsk = Threads.@spawn for msg in msgs
Base.write(io, msg)
end
else
tsk = @async for msg in msgs
Base.write(io, msg)
end
end
@sync for (i, tbl) in enumerate(parts(source))
if i == 1
cols = Tables.columns(tbl)
@@ -65,6 +72,7 @@ function write(io, source, writetofile, debug)
end
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug))
else
@static if VERSION >= v"1.3-DEV"
Threads.@spawn begin
try
cols = Tables.columns(tbl)
@@ -85,6 +93,28 @@ function write(io, source, writetofile, debug)
rethrow(e)
end
end
else
@async begin
try
cols = Tables.columns(tbl)
if !isempty(dictencodings)
for (colidx, (id, T, values)) in dictencodings
dictsch = Tables.Schema((:col,), (eltype(values),))
col = Tables.getcolumn(cols, colidx)
# get new values we haven't seen before for delta update
vals = setdiff(col, values)
put!(msgs, makedictionarybatchmsg(dictsch, (col=vals,), id, true, debug))
# add new values to existing set for future diffs
union!(values, vals)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, dictencodings, debug))
catch e
showerror(stdout, e, catch_backtrace())
rethrow(e)
end
end
end
end
end
close(msgs)
@@ -100,7 +100,12 @@ tt = Arrow.Table(io; debug=true)
@test all(isequal.(values(t), values(tt)))

# multiple record batches
t = Tables.partitioner(((col1=Union{Int64, Missing}[1,2,3,4,5,6,7,8,9,missing],), (col1=Union{Int64, Missing}[1,2,3,4,5,6,7,8,9,missing],)))
if isdefined(Tables, :partitioner)
xx = Tables.partitioner
else
xx = Tuple
end
t = xx(((col1=Union{Int64, Missing}[1,2,3,4,5,6,7,8,9,missing],), (col1=Union{Int64, Missing}[1,2,3,4,5,6,7,8,9,missing],)))
io = IOBuffer()
Arrow.write(io, t)
seekstart(io)

0 comments on commit 15c06e5

Please sign in to comment.