Skip to content
Permalink
Browse files
Make compressed writing threadsafe (#118)
Fixes #82. The problem when trying to write arrow using multiple threads and compression was there was only a single compressor object that each thread was simultaneously trying to use. This PR ensures there is a compressor object per thread that will be used per thread.
  • Loading branch information
quinnj committed Jan 30, 2021
1 parent 6d76412 commit da73d8201a8915325800dfda9a099539be096e7d
Showing 4 changed files with 18 additions and 11 deletions.
@@ -1,4 +1,5 @@
Manifest.toml
.DS_STORE
*.jl.cov
*.jl.*.cov
*.jl.mem
@@ -91,16 +91,18 @@ include("eltypes.jl")
include("table.jl")
include("write.jl")

const LZ4_FRAME_COMPRESSOR = Ref{LZ4FrameCompressor}()
const ZSTD_COMPRESSOR = Ref{ZstdCompressor}()
const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
const ZSTD_COMPRESSOR = ZstdCompressor[]

function __init__()
zstd = ZstdCompressor(; level=3)
CodecZstd.TranscodingStreams.initialize(zstd)
ZSTD_COMPRESSOR[] = zstd
lz4 = LZ4FrameCompressor(; compressionlevel=4)
CodecLz4.TranscodingStreams.initialize(lz4)
LZ4_FRAME_COMPRESSOR[] = lz4
for _ = 1:Threads.nthreads()
zstd = ZstdCompressor(; level=3)
CodecZstd.TranscodingStreams.initialize(zstd)
push!(ZSTD_COMPRESSOR, zstd)
lz4 = LZ4FrameCompressor(; compressionlevel=4)
CodecLz4.TranscodingStreams.initialize(lz4)
push!(LZ4_FRAME_COMPRESSOR, lz4)
end
return
end

@@ -30,14 +30,18 @@ validitybitmap(x::ArrowVector) = x.validity
nullcount(x::ArrowVector) = validitybitmap(x).nc
getmetadata(x::ArrowVector) = x.metadata

function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, LZ4FrameCompressor, ZstdCompressor}=nothing, kw...)
function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
@debug 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
@debug 3 x
A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionType.LZ4_FRAME, compression, A)
elseif compression isa Vector{LZ4FrameCompressor}
A = compress(Meta.CompressionType.LZ4_FRAME, compression[Threads.threadid()], A)
elseif compression isa ZstdCompressor
A = compress(Meta.CompressionType.ZSTD, compression, A)
elseif compression isa Vector{ZstdCompressor}
A = compress(Meta.CompressionType.ZSTD, compression[Threads.threadid()], A)
end
@debug 2 "converted top-level column to arrow format: $(typeof(A))"
@debug 3 A
@@ -85,9 +85,9 @@ end

function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested, alignment)
if compress === :lz4
compress = LZ4_FRAME_COMPRESSOR[]
compress = LZ4_FRAME_COMPRESSOR
elseif compress === :zstd
compress = ZSTD_COMPRESSOR[]
compress = ZSTD_COMPRESSOR
elseif compress isa Symbol
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end

0 comments on commit da73d82

Please sign in to comment.