Skip to content
Permalink
Browse files
Better handle errors when something goes wrong writing partitions (#154)
* Better handle errors when something goes wrong writing partitions

Follow up to #108. There were actually a few different issues all coming
together in @kjanosz's comment. The first being that `ZipFile.Reader`
doesn't like non-main threads touching its files _at all_. The Arrow.jl
problem there is when processing non-first partitions, we were just
`Threads.@spawn`-ing a task which then went off and sometimes became the
proverbial unheard tree falling in the forest. Like really, no one heard
these tasks and their poor exceptions.

The solution there is to be better shepherds of our spawned tasks; we
introduce a `anyerror` atomic bool that threads can set if they run into
issues and then in the main partition handling loop we'll check that. If
a thread ran into something, we'll log out the thread-specific
exception, then throw a generic writing error. This prevents the
"hanging" behavior people were seeing because the felled threads were
actually causing later tasks to hang on `put!`-ing into the
OrderedChannel.

After addressing this, we have the multithreaded ZipFile issue. With the
new `ntasks` keyword, it seems to make sense to me that if you pass
`ntasks=1`, you're really saying, I don't want any concurrency. So
anywhere we were `Threads.@spawn`ing, we now check if `ntasks == 1` and
if so, do an `@async` instead.

* Fix
  • Loading branch information
quinnj committed Mar 18, 2021
1 parent a8aaf95 commit 2cacbe5384d3494bb49d929a8b386514f0111c74
Showing 3 changed files with 41 additions and 13 deletions.
@@ -133,7 +133,6 @@ function arrowvector(::DictEncodedType, x::DictEncoded, i, nl, fi, de, ded, meta
# in this case, we just need to check if any values in our local pool need to be delta dicationary serialized
deltas = setdiff(x.encoding, encoding)
if !isempty(deltas)
@show deltas
ET = indextype(encoding)
if length(deltas) + length(encoding) > typemax(ET)
error("fatal error serializing dict encoded column with ref index type of $ET; subsequent record batch unique values resulted in $(length(deltas) + length(encoding)) unique values, which exceeds possible index values in $ET")
@@ -69,6 +69,7 @@ Supported keyword arguments to `Arrow.write` include:
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
* `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed
* `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures
* `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1`
* `file::Bool=false`: if a an `io` argument is being written to, passing `file=true` will cause the arrow file format to be written instead of just IPC streaming
"""
function write end
@@ -94,6 +95,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
elseif compress isa Symbol
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end
if ntasks < 1
throw(ArgumentError("ntasks keyword argument must be > 0; pass `ntasks=1` to disable multithreaded writing"))
end
if writetofile
@debug 1 "starting write of arrow formatted file"
Base.write(io, "ARROW1\0\0")
@@ -105,10 +109,19 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
blocks = (Block[], Block[])
# start message writing from channel
tsk = Threads.@spawn for msg in msgs
threaded = ntasks > 1
tsk = threaded ? (Threads.@spawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end)
anyerror = Threads.Atomic{Bool}(false)
errorref = Ref{Any}()
@sync for (i, tbl) in enumerate(Tables.partitions(source))
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
@debug 1 "processing table partition i = $i"
if i == 1
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
@@ -126,18 +139,17 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
end
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
else
Threads.@spawn begin
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
if !isempty(cols.dictencodingdeltas)
for de in cols.dictencodingdeltas
dictsch = Tables.Schema((:col,), (eltype(de.data),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=de.data,), de.id, true, alignment), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
if threaded
Threads.@spawn process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
else
@async process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
end
end
end
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
# close our message-writing channel, no further put!-ing is allowed
close(msgs)
# now wait for our message-writing task to finish writing
@@ -184,6 +196,23 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
return io
end

function process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
try
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
if !isempty(cols.dictencodingdeltas)
for de in cols.dictencodingdeltas
dictsch = Tables.Schema((:col,), (eltype(de.data),))
put!(msgs, makedictionarybatchmsg(dictsch, (col=de.data,), de.id, true, alignment), i)
end
end
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
catch e
errorref[] = (e, catch_backtrace(), i)
anyerror[] = true
end
return
end

struct ToArrowTable
sch::Tables.Schema
cols::Vector{Any}
@@ -238,7 +238,7 @@ t = Tables.partitioner(
)
)
io = IOBuffer()
@test_throws CompositeException Arrow.write(io, t)
@test_throws ErrorException Arrow.write(io, t)

end # @testset "misc"

0 comments on commit 2cacbe5

Please sign in to comment.