Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locks for de-/compressor + initialize decompressor #397

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/Arrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ include("show.jl")

const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
const ZSTD_COMPRESSOR = ZstdCompressor[]
const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[]
const ZSTD_DECOMPRESSOR = ZstdDecompressor[]
# add locks for multithreaded (de/)compression (because we index by threadid which might not be safe under Julia >1.8)
const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single thread can only do one thing at a time so maybe we can have a single lock per thread to be shared amongst all compressors/decompressors?

const ZSTD_COMPRESSOR_LOCK = ReentrantLock[]
const LZ4_FRAME_DECOMPRESSOR_LOCK = ReentrantLock[]
const ZSTD_DECOMPRESSOR_LOCK = ReentrantLock[]

function __init__()
for _ = 1:Threads.nthreads()
Expand All @@ -82,6 +89,19 @@ function __init__()
lz4 = LZ4FrameCompressor(; compressionlevel=4)
CodecLz4.TranscodingStreams.initialize(lz4)
push!(LZ4_FRAME_COMPRESSOR, lz4)
# Locks
push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock())
push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock())
# Decompressors
zstdd = ZstdDecompressor()
CodecZstd.TranscodingStreams.initialize(zstdd)
Copy link
Member

@baumgold baumgold Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be a bit lazier about initializing these as I believe they're quite expensive. Shall we initialize them only as needed? For example, if a user is only decompressing (reading) data then there's no reason to pay the penalty of initializing compressors (and vice versa). Same idea for LZ4 vs ZSTD - if a user only uses one then no need to initialize both.

push!(ZSTD_DECOMPRESSOR, zstdd)
lz4d = LZ4FrameDecompressor()
CodecLz4.TranscodingStreams.initialize(lz4d)
push!(LZ4_FRAME_DECOMPRESSOR, lz4d)
# Locks
push!(LZ4_FRAME_DECOMPRESSOR_LOCK, ReentrantLock())
push!(ZSTD_DECOMPRESSOR_LOCK, ReentrantLock())
end
return
end
Expand Down
9 changes: 7 additions & 2 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g
@debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
Copy link
Member

@baumgold baumgold Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should probably be broken down into a few smaller functions for clarity:

toarrowvector(a::ArrowVector, compression::Nothing, kw...) = a

function toarrowvector(a::ArrowVector, compression::AbstractVector{LZ4FrameCompressor}, kw...)
    lock(LZ4_FRAME_COMPRESSOR_LOCK[Threads.threadid()]) do
        compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], a)
    end
end

function toarrowvector(a::ArrowVector, compression::LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
    compress(Meta.CompressionTypes.LZ4_FRAME, compression, a)
end

function toarrowvector(a::ArrowVector, compression::AbstractVector{ZstdCompressor}, kw...)
    compress(Meta.CompressionTypes.ZSTD, compression, a)
end

function toarrowvector(a::ArrowVector, compression::ZstdCompressor, kw...)
    lock(ZSTD_COMPRESSOR_LOCK[Threads.threadid()]) do
        compress(Meta.CompressionTypes.ZSTD, compression[tid], a)
    end
end

function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
    @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
    @debugv 3 x
    A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
    A = toarrowvector(A, compression)
    @debugv 2 "converted top-level column to arrow format: $(typeof(A))"
    @debugv 3 A
    return A
end

@debugv 3 x
A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
tid=Threads.threadid()
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A)
elseif compression isa Vector{LZ4FrameCompressor}
A = compress(Meta.CompressionTypes.LZ4_FRAME, compression[Threads.threadid()], A)
A = lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do
compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], A)
end
elseif compression isa ZstdCompressor
A = compress(Meta.CompressionTypes.ZSTD, compression, A)
elseif compression isa Vector{ZstdCompressor}
A = compress(Meta.CompressionTypes.ZSTD, compression[Threads.threadid()], A)
A = lock(ZSTD_COMPRESSOR_LOCK[tid]) do
compress(Meta.CompressionTypes.ZSTD, compression[tid], A)
end
end
@debugv 2 "converted top-level column to arrow format: $(typeof(A))"
@debugv 3 A
Expand Down
9 changes: 7 additions & 2 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,15 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression)
len = unsafe_load(convert(Ptr{Int64}, ptr))
ptr += 8 # skip past uncompressed length as Int64
encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8)
tid=Threads.threadid()
if compression.codec === Meta.CompressionTypes.LZ4_FRAME
decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes)
decodedbytes = lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do
decodedbytes = transcode(LZ4_FRAME_DECOMPRESSOR[tid], encodedbytes)
end
elseif compression.codec === Meta.CompressionTypes.ZSTD
decodedbytes = transcode(ZstdDecompressor, encodedbytes)
decodedbytes = lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do
decodedbytes = transcode(ZSTD_DECOMPRESSOR[tid], encodedbytes)
end
else
error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
end
Expand Down