Skip to content
Permalink
Browse files
Ensure dict encoded index types match from record batch to record bat…
…ch (#148)

Fixes #144. The core issue here was the initial record batch had a
dict-encoded column that ended up having an index type of Int8. However,
in a subsequent record batch, we use a different code path for dict
encoded columns because we need to check if a dictionary delta message
needs to be sent (i.e. there are new pooled values that need to be
serialized). The problem was in this code path, the index type was
computed from the total length of the input column instead of matching
what was already serialized in the initial schema message.

This does open up the question of another possible failure: if an
initial dict encoded column is serialized with an index type of Int8,
yet subsequent record batches end up including enough unique values that
this index type will be overflowed. I've added in an error check for
this case. Currently it's a fatal error that will stop the `Arrow.write`
process completely. I'm not quite sure what the best recommendation
would be in that case; ultimately the user needs to either widen the
first record batch column index type, but perhaps we should allow
passing a dict-encoded index type to the overall `Arrow.write` function
so users can easily specify what that type should be.

The other change that had to be made in this PR is on the reading side,
since we're now tracking the index type in the DictEncoding type itself,
which probably not coincidentally is what the arrow-json struct already
does. For reading, we already have access to the dictionary field, so
it's just a matter of deserializing the index type before constructing
the DictEncoding struct.
  • Loading branch information
quinnj committed Mar 12, 2021
1 parent 0f1b350 commit d7a1e32c971611ca9822b296c7587d0da3ba6850
Showing 3 changed files with 38 additions and 11 deletions.
@@ -20,14 +20,27 @@
Represents the "pool" of possible values for a [`DictEncoded`](@ref)
array type. Whether the order of values is significant can be checked
by looking at the `isOrdered` boolean field.
The `S` type parameter, while not tied directly to any field, is the
signed integer "index type" of the parent DictEncoded. We keep track
of this in the DictEncoding in order to validate the length of the pool
doesn't exceed the index type limit. The general workflow of writing arrow
data means the initial schema will typically be based off the data in the
first record batch, and subsequent record batches need to match the same
schema exactly. For example, if a non-first record batch dict encoded column
were to cause a DictEncoding pool to overflow on unique values, a fatal error
should be thrown.
"""
mutable struct DictEncoding{T, A} <: ArrowVector{T}
mutable struct DictEncoding{T, S, A} <: ArrowVector{T}
id::Int64
data::A
isOrdered::Bool
metadata::Union{Nothing, Dict{String, String}}
end

indextype(::Type{DictEncoding{T, S, A}}) where {T, S, A} = S
indextype(::T) where {T <: DictEncoding} = indextype(T)

Base.size(d::DictEncoding) = size(d.data)

@propagate_inbounds function Base.getindex(d::DictEncoding{T}, i::Integer) where {T}
@@ -77,11 +90,11 @@ struct DictEncoded{T, S, A} <: ArrowVector{T}
arrow::Vector{UInt8} # need to hold a reference to arrow memory blob
validity::ValidityBitmap
indices::Vector{S}
encoding::DictEncoding{T, A}
encoding::DictEncoding{T, S, A}
metadata::Union{Nothing, Dict{String, String}}
end

DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, A}, meta) where {S, T, A} =
DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, S, A}, meta) where {S, T, A} =
DictEncoded{T, S, A}(b, v, inds, encoding, meta)

Base.size(d::DictEncoded) = size(d.indices)
@@ -146,7 +159,7 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
end
end
data = arrowvector(pool, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
encoding = DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data))
encoding = DictEncoding{eltype(data), eltype(inds), typeof(data)}(id, data, false, getmetadata(data))
de[id] = Lockable(encoding)
else
# encoding already exists
@@ -157,7 +170,7 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
@lock encodinglockable begin
encoding = encodinglockable.x
len = length(x)
ET = encodingtype(len)
ET = indextype(encoding)
pool = Dict{Union{eltype(encoding), eltype(x)}, ET}(a => (b - 1) for (b, a) in enumerate(encoding))
deltas = eltype(x)[]
inds = Vector{ET}(undef, len)
@@ -168,17 +181,20 @@ function arrowvector(::DictEncodedType, x, i, nl, fi, de, ded, meta; dictencode:
end
@inbounds inds[j] = get!(pool, val) do
push!(deltas, val)
length(pool)
return length(pool)
end
end
if !isempty(deltas)
if length(deltas) + length(encoding) > typemax(ET)
error("fatal error serializing dict encoded column with ref index type of $ET; subsequent record batch unique values resulted in $(length(deltas) + length(encoding)) unique values, which exceeds possible index values in $ET")
end
data = arrowvector(deltas, i, nl, fi, de, ded, nothing; dictencode=dictencodenested, dictencodenested=dictencodenested, dictencoding=true, kw...)
push!(ded, DictEncoding{eltype(data), typeof(data)}(id, data, false, getmetadata(data)))
push!(ded, DictEncoding{eltype(data), ET, typeof(data)}(id, data, false, getmetadata(data)))
if typeof(encoding.data) <: ChainedVector
append!(encoding.data, data)
else
data2 = ChainedVector([encoding.data, data])
encoding = DictEncoding{eltype(data2), typeof(data2)}(id, data2, false, getmetadata(encoding))
encoding = DictEncoding{eltype(data2), ET, typeof(data2)}(id, data2, false, getmetadata(encoding))
de[id] = Lockable(encoding)
end
end
@@ -109,7 +109,8 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
field = x.dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, x.dictencodings, Int64(1), Int64(1), x.convert)
A = ChainedVector([values])
x.dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
x.dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
@@ -240,15 +241,17 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
append!(dictencoding.data, values)
else
A = ChainedVector([dictencoding.data, values])
dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
end
continue
end
# new dictencoding or replace
field = dictencoded[id]
values, _, _ = build(field, field.type, batch, recordbatch, dictencodings, Int64(1), Int64(1), convert)
A = values
dictencodings[id] = DictEncoding{eltype(A), typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
S = field.dictionary.indexType === nothing ? Int32 : juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug 1 "parsing record batch message: compression = $(header.compression)"
@@ -252,6 +252,14 @@ seekstart(io)
t2 = Arrow.Table(io)
@test t2.x == t.x

# 144
t = Tables.partitioner(((a=Arrow.DictEncode([1,2,3]),), (a=Arrow.DictEncode(fill(1, 129)),)))
io = IOBuffer()
Arrow.write(io, t)
seekstart(io)
tt = Arrow.Table(io)
@test length(tt.a) == 132

end # @testset "misc"

end

0 comments on commit d7a1e32

Please sign in to comment.