Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: Potential speed up strategies #399

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
InlineStrings = "842dd82b-1e85-43dc-bf29-5d0ee9dffc48"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compat entry for InlineStrings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I set it to 1.4, as I don't understand its evolution up to this point, but I have a suspicion that "1" would work as well.

To be clear, I'm not sure if this PR could ever be merged.
I've hijacked (copy&pasted) transcode method from TranscodingStreams to create the mutating version, which is probably not the best practice :-/

LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
Expand All @@ -42,8 +43,9 @@ BitIntegers = "0.2"
CodecLz4 = "0.4"
CodecZstd = "0.7"
DataAPI = "1"
LoggingExtras = "0.4, 1"
FilePathsBase = "0.9"
InlineStrings = "1.4"
LoggingExtras = "0.4, 1"
PooledArrays = "0.5, 1.0"
SentinelArrays = "1"
Tables = "1.1"
Expand Down
28 changes: 28 additions & 0 deletions src/Arrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,23 @@ include("write.jl")
include("append.jl")
include("show.jl")

using InlineStrings
export InlineString, inlinestrings
include("inlinestrings.jl")

const TS = CodecLz4.TranscodingStreams
include("dirtyhacks.jl")


const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
const ZSTD_COMPRESSOR = ZstdCompressor[]
const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[]
const ZSTD_DECOMPRESSOR = ZstdDecompressor[]
# add locks for multithreaded (de/)compression (because we index by threadid which might not be safe under Julia >1.8)
const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[]
const ZSTD_COMPRESSOR_LOCK = ReentrantLock[]
const LZ4_FRAME_DECOMPRESSOR_LOCK = ReentrantLock[]
const ZSTD_DECOMPRESSOR_LOCK = ReentrantLock[]

function __init__()
for _ = 1:Threads.nthreads()
Expand All @@ -82,6 +97,19 @@ function __init__()
lz4 = LZ4FrameCompressor(; compressionlevel=4)
CodecLz4.TranscodingStreams.initialize(lz4)
push!(LZ4_FRAME_COMPRESSOR, lz4)
# Locks
push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock())
push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock())
# Decompressors
zstdd = ZstdDecompressor()
CodecZstd.TranscodingStreams.initialize(zstdd)
push!(ZSTD_DECOMPRESSOR, zstdd)
lz4d = LZ4FrameDecompressor()
CodecLz4.TranscodingStreams.initialize(lz4d)
push!(LZ4_FRAME_DECOMPRESSOR, lz4d)
# Locks
push!(LZ4_FRAME_DECOMPRESSOR_LOCK, ReentrantLock())
push!(ZSTD_DECOMPRESSOR_LOCK, ReentrantLock())
end
return
end
Expand Down
9 changes: 7 additions & 2 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g
@debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
@debugv 3 x
A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
tid=Threads.threadid()
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A)
elseif compression isa Vector{LZ4FrameCompressor}
A = compress(Meta.CompressionTypes.LZ4_FRAME, compression[Threads.threadid()], A)
A = lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do
compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], A)
end
elseif compression isa ZstdCompressor
A = compress(Meta.CompressionTypes.ZSTD, compression, A)
elseif compression isa Vector{ZstdCompressor}
A = compress(Meta.CompressionTypes.ZSTD, compression[Threads.threadid()], A)
A = lock(ZSTD_COMPRESSOR_LOCK[tid]) do
compress(Meta.CompressionTypes.ZSTD, compression[tid], A)
end
end
@debugv 2 "converted top-level column to arrow format: $(typeof(A))"
@debugv 3 A
Expand Down
47 changes: 47 additions & 0 deletions src/dirtyhacks.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# The following change needs to be upstreamed to TranscodingStreams.jl:

# We know the size of each output buffer (saved within Arrow metadata)
# The below functions mutates the provided output buffer.
function _transcode!(codec::Union{LZ4FrameDecompressor,ZstdDecompressor}, data::TS.ByteData,output::TS.Buffer)
input = TS.Buffer(data)
error = TS.Error()
code = TS.startproc(codec, :write, error)
if code === :error
@goto error
end
# n = TS.minoutsize(codec, buffermem(input))
@label process
# makemargin!(output, n)
Δin, Δout, code = TS.process(codec, TS.buffermem(input), TS.marginmem(output), error)
@debug(
"called process()",
code = code,
input_size = buffersize(input),
output_size = marginsize(output),
input_delta = Δin,
output_delta = Δout,
)
TS.consumed!(input, Δin)
TS.supplied!(output, Δout)
if code === :error
@goto error
elseif code === :end
if TS.buffersize(input) > 0
if TS.startproc(codec, :write, error) === :error
@goto error
end
# n = minoutsize(codec, buffermem(input))
@goto process
end
resize!(output.data, output.marginpos - 1)
return output.data
else
# n = max(Δout, minoutsize(codec, buffermem(input)))
@goto process
end
@label error
if !(TS.haserror)(error)
TS.set_default_error!(error)
end
throw(error[])
end
89 changes: 89 additions & 0 deletions src/inlinestrings.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# This code should be moved into InlineStrings extensions

### Type extensions
# Use InlineStrings to get data from pointers (for getindex and similar)
ArrowTypes.fromarrow(::Type{T}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len))
ArrowTypes.fromarrow(::Type{Union{T,Missing}}, ptr::Ptr{UInt8}, len::Int) where {T<:InlineString} =ArrowTypes.fromarrow(T, T(ptr, len))

### Utilities for inlining strings
# determines the maximum string length necessary for the offsets
# calculates difference between offsets as a proxy for string size
function _maximum_diff(v::AbstractVector{<:Integer})
mx = first(v)
prev = mx
@inbounds if length(v) > 1
mx = max(mx, v[2] - prev)
prev = v[2]
for i in firstindex(v)+2:lastindex(v)
diff = v[i] - prev
mx < diff && (mx = diff)
prev = v[i]
end
end
mx
end
# extract offsets from Arrow.List
_offsetsints(arrowlist::Arrow.List) = arrowlist.offsets.offsets

# convert strings to InlineStrings, does not check validity (hence unsafe!) - simply swaps the type
function _unsafe_convert(::Type{Arrow.List{S, O, A}}, vect::Arrow.List{T,O,A}) where {S<:Union{InlineString, Union{Missing, InlineString}},T<:Union{AbstractString, Union{Missing, AbstractString}},O,A}
Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata)
end

# passthrough for non-strings
pick_string_type(::Type{T}, offsets) where {T} = T
pick_string_type(::Type{Union{Missing,T}}, offsets) where {T<:AbstractString} = Union{Missing,pick_string_type(T, offsets)}
function pick_string_type(::Type{T}, offsets::AbstractVector{<:Integer}) where {T<:AbstractString}
max_size = _maximum_diff(offsets)
# if the maximum string length is less than 255, we can use InlineStrings
return max_size < 255 ? InlineStringType(max_size) : T
end
# find one joint string type for all chained arrays - vector of vectors
function pick_string_type(::Type{T}, vectoffsets::AbstractVector{<:AbstractVector{<:Integer}}) where {T<:AbstractString}
max_size = _maximum_diff.(vectoffsets)|>maximum
# if the maximum string length is less than 255, we can use InlineStrings
return max_size < 255 ? InlineStringType(max_size) : T
end

# extend inlinestrings to pass through Arrow.Lists
_inlinestrings(vect::AbstractVector) = vect

## methods for SentinelArrays.ChainedVector (if we have many RecordBatches / partitions)
# if it's already an InlineString, we can pass it through
_inlinestrings(vect::SentinelArrays.ChainedVector{<:Union{T,Union{T,Missing}}}) where {T<:InlineString} = vect
# if we detect a String type, try to inline it -- we need to find one unified type across all chained arrays
function _inlinestrings(vectofvect::SentinelArrays.ChainedVector{T, Arrow.List{T,O,A}}) where {T<:Union{AbstractString,Union{Missing,AbstractString}},O,A}
# find the smallest common denominator string type for all chained arrays
S = pick_string_type(T, _offsetsints.(vectofvect.arrays))
if S == T
# if the type is the same, we can pass it through
return vectofvect
else
# otherwise, we need to reconstruct the ChainedVector with the new string type
# TODO: look into in-place conversion
return SentinelArrays.ChainedVector(_unsafe_convert.(Arrow.List{S,O,A},vectofvect.arrays))
end
end

# TODO: check that we handle ChainedVector that contains something else than Arrow.List with Strings

# if we detect that the strings are small enough, we can inline them
function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A}
S = pick_string_type(T, _offsetsints(vect))
if S == T
return vect
else
# reconstruct the Arrow.List with the new string type
return _unsafe_convert(Arrow.List{S,O,A}, vect)
end
end

## methods for Arrow.List (if we have only 1 RecordBatch, ie, unpartitioned)
# if it's already an InlineString, we can pass it through
_inlinestrings(vect::Arrow.List{T,O,A}) where {T<:InlineString,O,A} = vect
# if we detect that the strings are small enough, we can inline them
function _inlinestrings(vect::Arrow.List{T,O,A}) where {T<:Union{AbstractString,Union{AbstractString,Missing}},O,A}
S = pick_string_type(T, vect.offsets.offsets)
# reconstruct the Arrow.List with the new string type
Arrow.List{S,O,A}(vect.arrow, vect.validity, vect.offsets, vect.data, vect.ℓ, vect.metadata)
end
18 changes: 15 additions & 3 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Table(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Table([ArrowBl
Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)

# will detect whether we're reading a Table from a file or stream
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, useinlinestrings::Bool=true)
t = Table()
sch = nothing
dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
Expand Down Expand Up @@ -362,6 +362,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
end
close(tsks)
wait(tsk)
# apply inlinestrings to each column if requested
convert && useinlinestrings && (columns(t) .= _inlinestrings.(columns(t)))
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
Expand Down Expand Up @@ -499,10 +501,20 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression)
len = unsafe_load(convert(Ptr{Int64}, ptr))
ptr += 8 # skip past uncompressed length as Int64
encodedbytes = unsafe_wrap(Array, ptr, buffer.length - 8)
tid=Threads.threadid()
output=TS.Buffer(len) # pre-allocate for output
if compression.codec === Meta.CompressionTypes.LZ4_FRAME
decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes)
# decodedbytes = transcode(LZ4FrameDecompressor, encodedbytes)
lock(LZ4_FRAME_DECOMPRESSOR_LOCK[tid]) do
_transcode!(LZ4_FRAME_DECOMPRESSOR[tid], encodedbytes, output)
end
decodedbytes = output.data
elseif compression.codec === Meta.CompressionTypes.ZSTD
decodedbytes = transcode(ZstdDecompressor, encodedbytes)
# decodedbytes = transcode(ZstdDecompressor, encodedbytes)
lock(ZSTD_DECOMPRESSOR_LOCK[tid]) do
_transcode!(ZSTD_DECOMPRESSOR[tid], encodedbytes, output)
end
decodedbytes = output.data
else
error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
end
Expand Down
18 changes: 14 additions & 4 deletions src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ function write end

write(io_or_file; kw...) = x -> write(io_or_file, x; kw...)

function write(file_path, tbl; kwargs...)
function write(file_path, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...)
if !isnothing(chunksize) && Tables.istable(tbl)
tbl_source = Iterators.partition(Tables.rows(tbl),chunksize)
else
tbl_source = tbl
end
open(Writer, file_path; file=true, kwargs...) do writer
write(writer, tbl)
write(writer, tbl_source)
end
file_path
end
Expand Down Expand Up @@ -278,9 +283,14 @@ function Base.close(writer::Writer)
nothing
end

function write(io::IO, tbl; kwargs...)
function write(io::IO, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...)
if !isnothing(chunksize) && Tables.istable(tbl)
tbl_source = Iterators.partition(Tables.rows(tbl),chunksize)
else
tbl_source = tbl
end
open(Writer, io; file=false, kwargs...) do writer
write(writer, tbl)
write(writer, tbl_source)
end
io
end
Expand Down