Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several fixes for writing large Arrow tables #57

Merged
merged 4 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/FlatBuffers/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ end

struct Array{T, S, TT} <: AbstractVector{T}
_tab::TT
pos::UOffsetT
pos::Int64
data::Vector{S}
end

Expand Down
32 changes: 16 additions & 16 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,24 @@ function ValidityBitmap(x)
blen = cld(len, 8)
bytes = Vector{UInt8}(undef, blen)
st = iterate(x)
i = 0
nc = 0
for k = 1:blen
b = 0x00
for j = 1:8
if (i + j) <= len
y, state = st
if y === missing
nc += 1
b = setbit(b, false, j)
else
b = setbit(b, true, j)
end
st = iterate(x, state)
end
b = 0xff
j = k = 1
for y in x
if y === missing
nc += 1
b = setbit(b, false, j)
end
i += 8
@inbounds bytes[k] = b
j += 1
if j == 9
@inbounds bytes[k] = b
b = 0xff
j = 1
k += 1
end
end
if j > 1
bytes[k] = b
end
return ValidityBitmap(nc == 0 ? UInt8[] : bytes, 1, nc == 0 ? 0 : len, nc)
end
Expand Down
31 changes: 15 additions & 16 deletions src/arraytypes/bool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,22 @@ function arrowvector(::BoolType, x, i, nl, fi, de, ded, meta; kw...)
len = length(x)
blen = cld(len, 8)
bytes = Vector{UInt8}(undef, blen)
st = iterate(x)
i = 0
for k = 1:blen
b = 0x00
for j = 1:8
if (i + j) <= len
y, state = st
if y === missing || !y
b = setbit(b, false, j)
else
b = setbit(b, true, j)
end
st = iterate(x, state)
end
b = 0xff
j = k = 1
for y in x
if y === false
b = setbit(b, false, j)
end
i += 8
@inbounds bytes[k] = b
j += 1
if j == 9
@inbounds bytes[k] = b
b = 0xff
j = 1
k += 1
end
end
if j > 1
bytes[k] = b
end
return BoolVector{eltype(x)}(bytes, 1, validity, len, meta)
end
Expand Down
22 changes: 12 additions & 10 deletions src/arraytypes/dictencoding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mutable struct DictEncoding{T, A} <: ArrowVector{T}
id::Int64
data::A
isOrdered::Bool
metadata::Union{Nothing, Dict{String, String}}
end

Base.size(d::DictEncoding) = size(d.data)
Expand Down Expand Up @@ -128,7 +129,7 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
pool = [get(pool[i]) for i = 1:length(pool)]
end
data = arrowvector(pool, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
encoding = DictEncoding{eltype(data), typeof(data)}(id, data, false)
encoding = DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data))
de[id] = Lockable(encoding)
else
# encoding already exists
Expand All @@ -138,10 +139,11 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
encodinglockable = de[id]
@lock encodinglockable begin
encoding = encodinglockable.x
pool = Dict(a => (b - 1) for (b, a) in enumerate(encoding))
deltas = eltype(x)[]
len = length(x)
inds = Vector{encodingtype(len)}(undef, len)
ET = encodingtype(len)
pool = Dict{Union{eltype(encoding), eltype(x)}, ET}(a => (b - 1) for (b, a) in enumerate(encoding))
deltas = eltype(x)[]
inds = Vector{ET}(undef, len)
categorical = typeof(x).name.name == :CategoricalArray
for (j, val) in enumerate(x)
if categorical
Expand All @@ -154,21 +156,21 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
end
if !isempty(deltas)
data = arrowvector(deltas, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
push!(ded, DictEncoding{eltype(data), typeof(data)}(id, data, false))
push!(ded, DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data)))
if typeof(encoding.data) <: ChainedVector
append!(encoding.data, data)
else
data2 = ChainedVector([encoding.data, data])
encoding = DictEncoding{eltype(data2), typeof(data2)}(id, data2, false)
encoding = DictEncoding{eltype(data2), typeof(data2)}(id, data2, false, getmetadata(encoding))
de[id] = Lockable(encoding)
end
end
end
end
if meta !== nothing && data.metadata !== nothing
merge!(meta, data.metadata)
elseif data.metadata !== nothing
meta = data.metadata
if meta !== nothing && getmetadata(encoding) !== nothing
merge!(meta, getmetadata(encoding))
elseif getmetadata(encoding) !== nothing
meta = getmetadata(encoding)
end
return DictEncoded(UInt8[], validity, inds, encoding, meta)
end
Expand Down
5 changes: 4 additions & 1 deletion src/eltypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,12 @@ function juliaeltype(f::Meta.Field, x::Meta.Timestamp, convert)
return Timestamp{x.unit, x.timezone === nothing ? nothing : Symbol(x.timezone)}
end

finaljuliatype(::Type{<:Timestamp}) = ZonedDateTime
finaljuliatype(::Type{Timestamp{U, TZ}}) where {U, TZ} = ZonedDateTime
finaljuliatype(::Type{Timestamp{U, nothing}}) where {U} = DateTime
Base.convert(::Type{ZonedDateTime}, x::Timestamp{U, TZ}) where {U, TZ} =
ZonedDateTime(Dates.DateTime(Dates.UTM(Int64(Dates.toms(periodtype(U)(x.x)) + UNIX_EPOCH_DATETIME))), TimeZone(String(TZ)))
Base.convert(::Type{DateTime}, x::Timestamp{U, nothing}) where {U} =
Dates.DateTime(Dates.UTM(Int64(Dates.toms(periodtype(U)(x.x)) + UNIX_EPOCH_DATETIME)))
Base.convert(::Type{Timestamp{Meta.TimeUnit.MILLISECOND, TZ}}, x::ZonedDateTime) where {TZ} =
Timestamp{Meta.TimeUnit.MILLISECOND, TZ}(Int64(Dates.value(DateTime(x, Local)) - UNIX_EPOCH_DATETIME))

Expand Down
4 changes: 2 additions & 2 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
A = ChainedVector([values])
x.dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered)
x.dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
Expand Down Expand Up @@ -241,7 +241,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
A = ChainedVector([values])
dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered)
dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
Expand Down
5 changes: 4 additions & 1 deletion src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ function writearray(io::IO, ::Type{T}, col) where {T}
end
else
n = 0
data = Vector{UInt8}(undef, sizeof(col))
buf = IOBuffer(data; write=true)
for x in col
n += Base.write(io, coalesce(x, ArrowTypes.default(T)))
n += Base.write(buf, coalesce(x, ArrowTypes.default(T)))
end
n = Base.write(io, take!(buf))
end
return n
end
Expand Down