Skip to content
Permalink
Browse files
ability to append partitions to existing arrow files (#160)
* ability to append partitions to an arrow file

This adds a method to `append` partitions to existing arrow files. Partitiions to append to are supplied in the form of any [Tables.jl](https://github.com/JuliaData/Tables.jl)-compatible table.

Multiple record batches will be written based on the number of `Tables.partitions(tbl)` that are provided.

Each partition being appended must have the same `Tables.Schema` as the destination arrow file that is being appended to.

Other parameters that `append` accepts are similar to what `write` accepts.

* remove unused methods

* add more tests and some fixes

* allow appends to both seekable IO and files

* few changes to Stream,avoid duplication for append

store few additional stream properties in the `Stream` data type and avoid duplicating code for append functionality

* call Tables.schema on result of Tables.columns
  • Loading branch information
tanmaykm committed Apr 23, 2021
1 parent 125c1e7 commit 742ea4c4d805e169cdf622b06df0339b6d6cd2f8
Showing 7 changed files with 338 additions and 9 deletions.
@@ -92,6 +92,7 @@ include("arraytypes/arraytypes.jl")
include("eltypes.jl")
include("table.jl")
include("write.jl")
include("append.jl")

const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
const ZSTD_COMPRESSOR = ZstdCompressor[]
@@ -0,0 +1,165 @@
"""
Arrow.append(io::IO, tbl)
Arrow.append(file::String, tbl)
tbl |> Arrow.append(file)
Append any [Tables.jl](https://github.com/JuliaData/Tables.jl)-compatible `tbl`
to an existing arrow formatted file or IO. The existing arrow data must be in
IPC stream format. Note that appending to the "feather formatted file" is _not_
allowed, as this file format doesn't support appending. That means files written
like `Arrow.write(filename::String, tbl)` _cannot_ be appended to; instead, you
should write like `Arrow.write(filename::String, tbl; file=false)`.
When an IO object is provided to be written on to, it must support seeking. For
example, a file opened in `r+` mode or an `IOBuffer` that is readable, writable
and seekable can be appended to, but not a network stream.
Multiple record batches will be written based on the number of
`Tables.partitions(tbl)` that are provided; by default, this is just
one for a given table, but some table sources support automatic
partitioning. Note you can turn multiple table objects into partitions
by doing `Tables.partitioner([tbl1, tbl2, ...])`, but note that
each table must have the exact same `Tables.Schema`.
By default, `Arrow.append` will use multiple threads to write multiple
record batches simultaneously (e.g. if julia is started with `julia -t 8`
or the `JULIA_NUM_THREADS` environment variable is set).
Supported keyword arguments to `Arrow.append` include:
* `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)`
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html)
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
* `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed
* `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures
* `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1`
* `convert::Bool`: whether certain arrow primitive types in the schema of `file` should be converted to Julia defaults for matching them to the schema of `tbl`; by default, `convert=true`.
* `file::Bool`: applicable when an `IO` is provided, whether it is a file; by default `file=false`.
"""
function append end

append(io_or_file; kw...) = x -> append(io_or_file, x; kw...)

function append(file::String, tbl; kwargs...)
open(file, "r+") do io
append(io, tbl; file=true, kwargs...)
end

return file
end

function append(io::IO, tbl;
largelists::Bool=false,
denseunions::Bool=true,
dictencode::Bool=false,
dictencodenested::Bool=false,
alignment::Int=8,
maxdepth::Int=DEFAULT_MAX_DEPTH,
ntasks=Inf,
convert::Bool=true,
file::Bool=false)

if ntasks < 1
throw(ArgumentError("ntasks keyword argument must be > 0; pass `ntasks=1` to disable multithreaded writing"))
end

isstream, arrow_schema, compress = stream_properties(io; convert=convert)
if !isstream
throw(ArgumentError("append is supported only to files in arrow stream format"))
end

if compress === :lz4
compress = LZ4_FRAME_COMPRESSOR
elseif compress === :zstd
compress = ZSTD_COMPRESSOR
elseif compress isa Symbol
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end

append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)

return io
end

function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)
seekend(io)
skip(io, -8) # overwrite last 8 bytes of last empty message footer

sch = Ref{Tables.Schema}(arrow_schema)
msgs = OrderedChannel{Message}(ntasks)
dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
# build messages
blocks = (Block[], Block[])
# start message writing from channel
threaded = ntasks > 1
tsk = threaded ? (Threads.@spawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end)
anyerror = Threads.Atomic{Bool}(false)
errorref = Ref{Any}()
@sync for (i, tbl) in enumerate(Tables.partitions(source))
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
@debug 1 "processing table partition i = $i"
tbl_cols = Tables.columns(tbl)
tbl_schema = Tables.schema(tbl_cols)

if !is_equivalent_schema(arrow_schema, tbl_schema)
throw(ArgumentError("Table schema does not match existing arrow file schema"))
end

if threaded
Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
else
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
end
end
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
# close our message-writing channel, no further put!-ing is allowed
close(msgs)
# now wait for our message-writing task to finish writing
wait(tsk)

Base.write(io, Message(UInt8[], nothing, 0, true, false, Meta.Schema), blocks, sch, alignment)

return io
end

function stream_properties(io::IO; convert::Bool=true)
startpos = position(io)
buff = similar(FILE_FORMAT_MAGIC_BYTES)
start_magic = read!(io, buff) == FILE_FORMAT_MAGIC_BYTES
seekend(io)
len = position(io) - startpos
skip(io, -length(FILE_FORMAT_MAGIC_BYTES))
end_magic = read!(io, buff) == FILE_FORMAT_MAGIC_BYTES
seek(io, startpos) # leave the stream position unchanged

isstream = !(len > 24 && start_magic && end_magic)
if isstream
stream = Stream(io, convert=convert)
for table in stream
# no need to scan further once we get compression information
(stream.compression[] !== nothing) && break
end
seek(io, startpos) # leave the stream position unchanged
return isstream, Tables.Schema(stream.names, stream.types), stream.compression[]
else
return isstream, nothing, nothing
end
end

function is_equivalent_schema(sch1::Tables.Schema, sch2::Tables.Schema)
(sch1.names == sch2.names) || (return false)
for (t1,t2) in zip(sch1.types, sch2.types)
(t1 === t2) || (return false)
end
true
end
@@ -44,10 +44,12 @@ struct Stream
batchiterator::BatchIterator
pos::Int
names::Vector{Symbol}
types::Vector{Type}
schema::Meta.Schema
dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding
dictencoded::Dict{Int64, Meta.Field} # dictionary id => field
convert::Bool
compression::Ref{Union{Symbol,Nothing}}
end

Tables.partitions(x::Stream) = x
@@ -75,19 +77,23 @@ function Stream(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothi
# assert endianness?
# store custom_metadata?
names = Symbol[]
types = Type[]
for (i, field) in enumerate(schema.fields)
push!(names, Symbol(field.name))
push!(types, juliaeltype(field, buildmetadata(field.custom_metadata), convert))
# recursively find any dictionaries for any fields
getdictionaries!(dictencoded, field)
@debug 1 "parsed column from schema: field = $field"
end
return Stream(batchiterator, pos, names, schema, dictencodings, dictencoded, convert)
return Stream(batchiterator, pos, names, types, schema, dictencodings, dictencoded, convert, Ref{Union{Symbol,Nothing}}(nothing))
end

Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()

function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
columns = AbstractVector[]
compression = nothing

while true
state = iterate(x.batchiterator, (pos, id))
state === nothing && return nothing
@@ -97,6 +103,9 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
id = header.id
recordbatch = header.data
@debug 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if recordbatch.compression !== nothing
compression = recordbatch.compression
end
if haskey(x.dictencodings, id) && header.isDelta
# delta
field = x.dictencoded[id]
@@ -114,6 +123,9 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
if header.compression !== nothing
compression = header.compression
end
for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
push!(columns, vec)
end
@@ -122,6 +134,17 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end

if compression !== nothing
if compression.codec == Flatbuf.CompressionType.ZSTD
x.compression[] = :zstd
elseif compression.codec == Flatbuf.CompressionType.LZ4_FRAME
x.compression[] = :lz4
else
throw(ArgumentError("unsupported compression codec: $(compression.codec)"))
end
end

lookup = Dict{Symbol, AbstractVector}()
types = Type[]
for (nm, col) in zip(x.names, columns)
@@ -132,8 +132,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
error("fatal error writing arrow data")
end
@debug 1 "processing table partition i = $i"
tblcols = Tables.columns(tbl)
if i == 1
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
cols = toarrowtable(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
sch[] = Tables.schema(cols)
firstcols[] = cols
put!(msgs, makeschemamsg(sch[], cols), i)
@@ -149,9 +150,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true)
else
if threaded
Threads.@spawn process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
Threads.@spawn process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
else
@async process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
@async process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
end
end
end
@@ -205,9 +206,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte
return io
end

function process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
function process_partition(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
try
cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
cols = toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
if !isempty(cols.dictencodingdeltas)
for de in cols.dictencodingdeltas
dictsch = Tables.Schema((:col,), (eltype(de.data),))
@@ -229,9 +230,8 @@ struct ToArrowTable
dictencodingdeltas::Vector{DictEncoding}
end

function toarrowtable(x, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
function toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth)
@debug 1 "converting input table to arrow formatted columns"
cols = Tables.columns(x)
meta = getmetadata(cols)
sch = Tables.schema(cols)
types = collect(sch.types)
@@ -559,7 +559,7 @@ function DataFile(source)
dictencodings = Dict{String, Tuple{Base.Type, DictEncoding}}()
dictid = Ref(0)
for (i, tbl1) in Tables.partitions(source)
tbl = Arrow.toarrowtable(tbl1)
tbl = Arrow.toarrowtable(Table.columns(tbl1))
if i == 1
sch = Tables.schema(tbl)
for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl))
@@ -18,6 +18,7 @@ using Test, Arrow, Tables, Dates, PooledArrays, TimeZones, UUIDs, CategoricalArr

include(joinpath(dirname(pathof(Arrow)), "ArrowTypes/test/tests.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl"))

@@ -41,6 +42,20 @@ end

end # @testset "table roundtrips"

@testset "table append" begin

for case in testtables
testappend(case...)
end

testappend_partitions()

for compression_option in (:lz4, :zstd)
testappend_compression(compression_option)
end

end # @testset "table append"

@testset "arrow json integration tests" begin

for file in readdir(joinpath(dirname(pathof(Arrow)), "../test/arrowjson"))

0 comments on commit 742ea4c

Please sign in to comment.