Skip to content
Permalink
Browse files
Allow passing custom lz4 & zstd compressors to Arrow.write (#34)
* Allow passing custom lz4 & zstd compressors to Arrow.write

* update docs
  • Loading branch information
quinnj committed Oct 6, 2020
1 parent 7850b98 commit 315d39a9203f1b56eec1408c2c8e244019606ace
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 50 deletions.
@@ -50,6 +50,10 @@ Read an arrow formatted table, from:

Returns a `Arrow.Table` object that allows column access via `table.col1`, `table[:col1]`, or `table[1]`.

NOTE: the columns in an `Arrow.Table` are views into the original arrow memory, and hence are not easily
modifiable (with e.g. `push!`, `append!`, etc.). To mutate arrow columns, call `copy(x)` to materialize
the arrow data as a normal Julia array.

`Arrow.Table` also satisfies the Tables.jl interface, and so can easily be materialized via any supporting
sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", Arrow.Table(file))`, etc.

@@ -117,7 +121,7 @@ By default, `Arrow.write` will use multiple threads to write multiple
record batches simultaneously (e.g. if julia is started with `julia -t 8`).

Supported keyword arguments to `Arrow.write` include:
* `compress::Symbol`: possible values include `:lz4` or `:zstd`; will cause all buffers in each record batch to use the respective compression encoding
* `compress`: possible values include `:lz4`, `:zstd`, or your own initialized `LZ4FrameCompressor` or `ZstdCompressor` objects; will cause all buffers in each record batch to use the respective compression encoding
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; many other implementations don't support this
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
@@ -75,4 +75,17 @@ include("eltypes.jl")
include("table.jl")
include("write.jl")

const LZ4_FRAME_COMPRESSOR = Ref{LZ4FrameCompressor}()
const ZSTD_COMPRESSOR = Ref{ZstdCompressor}()

function __init__()
zstd = ZstdCompressor(; level=3)
CodecZstd.TranscodingStreams.initialize(zstd)
ZSTD_COMPRESSOR[] = zstd
lz4 = LZ4FrameCompressor(; compressionlevel=4)
CodecLz4.TranscodingStreams.initialize(lz4)
LZ4_FRAME_COMPRESSOR[] = lz4
return
end

end # module Arrow
@@ -16,20 +16,14 @@ Base.eltype(c::Compressed{Z, A}) where {Z, A} = eltype(A)
getmetadata(x::Compressed) = getmetadata(x.data)
compressiontype(c::Compressed{Z}) where {Z} = Z

function compress(Z::Meta.CompressionType, x::Array)
function compress(Z::Meta.CompressionType, comp, x::Array)
GC.@preserve x begin
y = unsafe_wrap(Array, convert(Ptr{UInt8}, pointer(x)), sizeof(x))
if Z == Meta.CompressionType.LZ4_FRAME
return CompressedBuffer(transcode(LZ4FrameCompressor, y), length(y))
elseif Z == Meta.CompressionType.ZSTD
return CompressedBuffer(transcode(ZstdCompressor, y), length(y))
else
throw(ArgumentError("unsupported compression type: $Z"))
end
return CompressedBuffer(transcode(comp, y), length(y))
end
end

compress(Z::Meta.CompressionType, x) = compress(Z, convert(Array, x))
compress(Z::Meta.CompressionType, comp, x) = compress(Z, comp, convert(Array, x))

abstract type ArrowVector{T} <: AbstractVector{T} end

@@ -39,14 +33,14 @@ validitybitmap(x::ArrowVector) = x.validity
nullcount(x::ArrowVector) = validitybitmap(x).nc
getmetadata(x::ArrowVector) = x.metadata

function toarrowvector(x, de=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Symbol}=nothing, kw...)
function toarrowvector(x, de=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, LZ4FrameCompressor, ZstdCompressor}=nothing, kw...)
@debug 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $kw"
@debug 3 x
A = arrowvector(x, de, meta; compression=compression, kw...)
if compression === :lz4
A = compress(Meta.CompressionType.LZ4_FRAME, A)
elseif compression === :zstd
A = compress(Meta.CompressionType.ZSTD, A)
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionType.LZ4_FRAME, compression, A)
elseif compression isa ZstdCompressor
A = compress(Meta.CompressionType.ZSTD, compression, A)
end
@debug 2 "converted top-level column to arrow format: $(typeof(A))"
@debug 3 A
@@ -83,7 +77,7 @@ function arrowvector(::Type{S}, ::Type{T}, x, de, meta; kw...) where {S, T}
end

arrowvector(::NullType, ::Type{Missing}, ::Type{Missing}, x, de, meta; kw...) = MissingVector(length(x))
compress(Z::Meta.CompressionType, v::MissingVector) =
compress(Z::Meta.CompressionType, comp, v::MissingVector) =
Compressed{Z, MissingVector}(v, CompressedBuffer[], length(v), length(v), Compressed[])

struct ValidityBitmap <: ArrowVector{Bool}
@@ -93,8 +87,8 @@ struct ValidityBitmap <: ArrowVector{Bool}
nc::Int # null count
end

compress(Z::Meta.CompressionType, v::ValidityBitmap) =
v.nc == 0 ? CompressedBuffer(UInt8[], 0) : compress(Z, view(v.bytes, v.pos:(v.pos + bitpackedbytes(v.ℓ) - 1)))
compress(Z::Meta.CompressionType, comp, v::ValidityBitmap) =
v.nc == 0 ? CompressedBuffer(UInt8[], 0) : compress(Z, comp, view(v.bytes, v.pos:(v.pos + bitpackedbytes(v.ℓ) - 1)))

Base.size(p::ValidityBitmap) = (p.ℓ,)
nullcount(x::ValidityBitmap) = x.nc
@@ -206,11 +200,11 @@ end
return v
end

function compress(Z::Meta.CompressionType, p::P) where {P <: Primitive}
function compress(Z::Meta.CompressionType, comp, p::P) where {P <: Primitive}
len = length(p)
nc = nullcount(p)
validity = compress(Z, p.validity)
data = compress(Z, p.data)
validity = compress(Z, comp, p.validity)
data = compress(Z, comp, p.data)
return Compressed{Z, P}(p, [validity, data], len, nc, Compressed[])
end

@@ -276,17 +270,17 @@ end

# end

function compress(Z::Meta.CompressionType, x::List{T, O, A}) where {T, O, A}
function compress(Z::Meta.CompressionType, comp, x::List{T, O, A}) where {T, O, A}
len = length(x)
nc = nullcount(x)
validity = compress(Z, x.validity)
offsets = compress(Z, x.offsets.offsets)
validity = compress(Z, comp, x.validity)
offsets = compress(Z, comp, x.offsets.offsets)
buffers = [validity, offsets]
children = Compressed[]
if eltype(A) == UInt8
push!(buffers, compress(Z, x.data))
push!(buffers, compress(Z, comp, x.data))
else
push!(children, compress(Z, x.data))
push!(children, compress(Z, comp, x.data))
end
return Compressed{Z, typeof(x)}(x, buffers, len, nc, children)
end
@@ -342,16 +336,16 @@ end
return v
end

function compress(Z::Meta.CompressionType, x::FixedSizeList{T, A}) where {T, A}
function compress(Z::Meta.CompressionType, comp, x::FixedSizeList{T, A}) where {T, A}
len = length(x)
nc = nullcount(x)
validity = compress(Z, x.validity)
validity = compress(Z, comp, x.validity)
buffers = [validity]
children = Compressed[]
if eltype(A) == UInt8
push!(buffers, compress(Z, x.data))
push!(buffers, compress(Z, comp, x.data))
else
push!(children, compress(Z, x.data))
push!(children, compress(Z, comp, x.data))
end
return Compressed{Z, typeof(x)}(x, buffers, len, nc, children)
end
@@ -389,14 +383,14 @@ end
end
end

function compress(Z::Meta.CompressionType, x::A) where {A <: Map}
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: Map}
len = length(x)
nc = nullcount(x)
validity = compress(Z, x.validity)
offsets = compress(Z, x.offsets.offsets)
validity = compress(Z, comp, x.validity)
offsets = compress(Z, comp, x.offsets.offsets)
buffers = [validity, offsets]
children = Compressed[]
push!(children, compress(Z, x.data))
push!(children, compress(Z, comp, x.data))
return Compressed{Z, A}(x, buffers, len, nc, children)
end

@@ -452,14 +446,14 @@ end
return v
end

function compress(Z::Meta.CompressionType, x::A) where {A <: Struct}
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: Struct}
len = length(x)
nc = nullcount(x)
validity = compress(Z, x.validity)
validity = compress(Z, comp, x.validity)
buffers = [validity]
children = Compressed[]
for y in x.data
push!(children, compress(Z, y))
push!(children, compress(Z, comp, y))
end
return Compressed{Z, A}(x, buffers, len, nc, children)
end
@@ -515,15 +509,15 @@ end
return v
end

function compress(Z::Meta.CompressionType, x::A) where {A <: DenseUnion}
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: DenseUnion}
len = length(x)
nc = nullcount(x)
typeIds = compress(Z, x.typeIds)
offsets = compress(Z, x.offsets)
typeIds = compress(Z, comp, x.typeIds)
offsets = compress(Z, comp, x.offsets)
buffers = [typeIds, offsets]
children = Compressed[]
for y in x.data
push!(children, compress(Z, y))
push!(children, compress(Z, comp, y))
end
return Compressed{Z, A}(x, buffers, len, nc, children)
end
@@ -554,14 +548,14 @@ end
return v
end

function compress(Z::Meta.CompressionType, x::A) where {A <: SparseUnion}
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: SparseUnion}
len = length(x)
nc = nullcount(x)
typeIds = compress(Z, x.typeIds)
typeIds = compress(Z, comp, x.typeIds)
buffers = [typeIds]
children = Compressed[]
for y in x.data
push!(children, compress(Z, y))
push!(children, compress(Z, comp, y))
end
return Compressed{Z, A}(x, buffers, len, nc, children)
end
@@ -702,10 +696,10 @@ function Base.copy(x::DictEncoded{T, S}) where {T, S}
return PooledArray(PooledArrays.RefArray(refs), Dict{T, S}(val => i for (i, val) in enumerate(pool)), pool)
end

function compress(Z::Meta.CompressionType, x::A) where {A <: DictEncoded}
function compress(Z::Meta.CompressionType, comp, x::A) where {A <: DictEncoded}
len = length(x)
nc = nullcount(x)
validity = compress(Z, x.validity)
inds = compress(Z, x.indices)
validity = compress(Z, comp, x.validity)
inds = compress(Z, comp, x.indices)
return Compressed{Z, A}(x, [validity, inds], len, nc, Compressed[])
end
@@ -123,6 +123,10 @@ Read an arrow formatted table, from:
Returns a `Arrow.Table` object that allows column access via `table.col1`, `table[:col1]`, or `table[1]`.
NOTE: the columns in an `Arrow.Table` are views into the original arrow memory, and hence are not easily
modifiable (with e.g. `push!`, `append!`, etc.). To mutate arrow columns, call `copy(x)` to materialize
the arrow data as a normal Julia array.
`Arrow.Table` also satisfies the Tables.jl interface, and so can easily be materialied via any supporting
sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", Arrow.Table(file))`, etc.
@@ -27,7 +27,7 @@ By default, `Arrow.write` will use multiple threads to write multiple
record batches simultaneously (e.g. if julia is started with `julia -t 8`).
Supported keyword arguments to `Arrow.write` include:
* `compress::Symbol`: possible values include `:lz4` or `:zstd`; will cause all buffers in each record batch to use the respective compression encoding
* `compress`: possible values include `:lz4`, `:zstd`, or your own initialized `LZ4FrameCompressor` or `ZstdCompressor` objects; will cause all buffers in each record batch to use the respective compression encoding
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; many other implementations don't support this
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
@@ -36,14 +36,14 @@ Supported keyword arguments to `Arrow.write` include:
"""
function write end

function write(file::String, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false)
function write(file::String, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false)
open(file, "w") do io
write(io, tbl, true, largelists, compress, denseunions, dictencode, dictencodenested)
end
return file
end

function write(io::IO, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, file::Bool=false)
function write(io::IO, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, file::Bool=false)
return write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested)
end

@@ -102,6 +102,13 @@ function Base.close(ch::OrderedChannel)
end

function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested)
if compress === :lz4
compress = LZ4_FRAME_COMPRESSOR[]
elseif compress === :zstd
compress = ZSTD_COMPRESSOR[]
elseif compress isa Symbol
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end
if writetofile
@debug 1 "starting write of arrow formatted file"
Base.write(io, "ARROW1\0\0")
@@ -72,6 +72,27 @@ tt = Arrow.Table(io)
@test Arrow.getmetadata(tt) == meta
@test Arrow.getmetadata(tt.col1) == meta2

# custom compressors
lz4 = Arrow.CodecLz4.LZ4FrameCompressor(; compressionlevel=8)
Arrow.CodecLz4.TranscodingStreams.initialize(lz4)
t = (col1=Int64[1,2,3,4,5,6,7,8,9,10],)
io = IOBuffer()
Arrow.write(io, t; compress=lz4)
seekstart(io)
tt = Arrow.Table(io)
@test length(tt) == length(t)
@test all(isequal.(values(t), values(tt)))

lz4 = Arrow.CodecZstd.ZstdCompressor(; level=8)
Arrow.CodecZstd.TranscodingStreams.initialize(lz4)
t = (col1=Int64[1,2,3,4,5,6,7,8,9,10],)
io = IOBuffer()
Arrow.write(io, t; compress=lz4)
seekstart(io)
tt = Arrow.Table(io)
@test length(tt) == length(t)
@test all(isequal.(values(t), values(tt)))

end # @testset "misc"

end

0 comments on commit 315d39a

Please sign in to comment.