Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 55 additions & 30 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
sch = nothing
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
tsk = @wkspawn begin
i = 1
for cols in tsks
if i == 1
foreach(x -> push!(columns(t), x), cols)
elseif i == 2
foreach(1:length(cols)) do i
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
end
else
foreach(1:length(cols)) do i
append!(columns(t)[i], cols[i])
end
end
i += 1
end
end
anyrecordbatches = false
# we'll grow/add a record batch set of columns as they're constructed
# must be holding the lock while growing/adding
# starts at 0-length because we don't know how many record batches there will be
rb_cols = []
rb_cols_lock = ReentrantLock()
rbi = 1
@sync for blob in blobs
tasks = Task[]
for blob in blobs
for batch in BatchIterator(blob)
# store custom_metadata of batch.msg?
header = batch.msg.header
Expand Down Expand Up @@ -578,30 +564,49 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
end # lock
@debug "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debug "parsing record batch message: compression = $(header.compression)"
@wkspawn begin
cols =
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
push!(
tasks,
collect_cols!(
rbi,
rb_cols_lock,
rb_cols,
sch,
batch,
dictencodingslockable,
convert,
),
)
rbi += 1
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
end
close(tsks)
wait(tsk)
_waitall(tasks)
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
if !anyrecordbatches && !isnothing(sch)
# no more multithreading, so no need to take the lock now
if length(rb_cols) == 0 && !isnothing(sch)
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
if length(rb_cols) > 0
foreach(x -> push!(columns(t), x), rb_cols[1])
end
if length(rb_cols) > 1
foreach(enumerate(rb_cols[2])) do (i, x)
columns(t)[i] = ChainedVector([columns(t)[i], x])
end
foreach(3:length(rb_cols)) do j
foreach(enumerate(rb_cols[j])) do (i, x)
append!(columns(t)[i], x)
end
end
end
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
Expand All @@ -610,6 +615,26 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
return t
end

function collect_cols!(
rbi,
rb_cols_lock,
rb_cols,
sch,
batch,
dictencodingslockable,
convert,
)
@wkspawn begin
cols = collect(VectorIterator(sch, batch, dictencodingslockable, convert))
@lock rb_cols_lock begin
if length(rb_cols) < rbi
resize!(rb_cols, rbi)
end
rb_cols[rbi] = cols
end
end
end

function getdictionaries!(dictencoded, field)
d = field.dictionary
if d !== nothing
Expand Down
6 changes: 6 additions & 0 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ function writezeros(io::IO, n::Integer)
s
end

if isdefined(Base, :waitall)
const _waitall = waitall
else
_waitall(tasks) = foreach(wait, tasks)
end

# efficient writing of arrays
writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)

Expand Down
Loading