Skip to content
Permalink
Browse files
Use threaded tasks to read record batches in parallel for Arrow.Table (
  • Loading branch information
quinnj committed Oct 23, 2020
1 parent 54ebbc6 commit 5c207eadb53115d9053e87a666d188ddf33c5b2e
Showing 1 changed file with 26 additions and 19 deletions.
@@ -189,6 +189,25 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
sch = nothing
dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
tsks = Channel{Task}(Inf)
tsk = Threads.@spawn begin
i = 1
for tsk in tsks
cols = fetch(tsk)
if i == 1
foreach(x -> push!(columns(t), x), cols)
elseif i == 2
foreach(1:length(cols)) do i
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
end
else
foreach(1:length(cols)) do i
append!(columns(t)[i], cols[i])
end
end
i += 1
end
end
for batch in BatchIterator(bytes, off)
# store custom_metadata of batch.msg?
header = batch.msg.header
@@ -224,29 +243,15 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
if isempty(columns(t))
# first RecordBatch
for vec in VectorIterator(sch, batch, dictencodings, convert)
push!(columns(t), vec)
end
@debug 1 "parsed 1st record batch"
elseif !(columns(t)[1] isa ChainedVector)
# second RecordBatch
for (i, vec) in enumerate(VectorIterator(sch, batch, dictencodings, convert))
columns(t)[i] = ChainedVector([columns(t)[i], vec])
end
@debug 1 "parsed 2nd record batch"
else
# 2+ RecordBatch
for (i, vec) in enumerate(VectorIterator(sch, batch, dictencodings, convert))
append!(columns(t)[i], vec)
end
@debug 1 "parsed additional record batch"
end
put!(tsks, Threads.@spawn begin
collect(VectorIterator(sch, batch, dictencodings, convert))
end)
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
close(tsks)
wait(tsk)
lu = lookup(t)
ty = types(t)
for (nm, col) in zip(names(t), columns(t))
@@ -327,6 +332,8 @@ function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(
return A, (columnidx + 1, nodeidx, bufferidx)
end

Base.length(x::VectorIterator) = length(x.schema.fields)

const ListTypes = Union{Meta.Utf8, Meta.LargeUtf8, Meta.Binary, Meta.LargeBinary, Meta.List, Meta.LargeList}
const LargeLists = Union{Meta.LargeUtf8, Meta.LargeBinary, Meta.LargeList}

0 comments on commit 5c207ea

Please sign in to comment.