Skip to content
Permalink
Browse files
remove global metadata cache, refactor custom_metadata API (#238)
Co-authored-by: Eric Hanson <5846501+ericphanson@users.noreply.github.com>
Co-authored-by: Jacob Quinn <quinn.jacobd@gmail.com>
  • Loading branch information
3 people committed Sep 14, 2021
1 parent 7d59427 commit 967e3c67437219698afae889a13e881b60033fe3
Showing 20 changed files with 120 additions and 106 deletions.
@@ -1,7 +1,7 @@
name = "Arrow"
uuid = "69666777-d1a9-59fb-9406-91d4454c9d45"
authors = ["quinnj <quinn.jacobd@gmail.com>"]
version = "1.6.2"
version = "2.0.0"

[deps]
ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
@@ -159,9 +159,13 @@ This stuff can definitely make your eyes glaze over if you stare at it long enou

In addition to `Arrow.Table`, the Arrow.jl package also provides `Arrow.Stream` for processing arrow data. While `Arrow.Table` will iterate all record batches in an arrow file/stream, concatenating columns, `Arrow.Stream` provides a way to *iterate* through record batches, one at a time. Each iteration yields an `Arrow.Table` instance, with columns/data for a single record batch. This allows, if so desired, "batch processing" of arrow data, one record batch at a time, instead of creating a single long table via `Arrow.Table`.

### Table and column metadata
### Custom application metadata

The arrow format allows attaching arbitrary metadata in the form of a `Dict{String, String}` to tables and individual columns. The Arrow.jl package supports retrieving serialized metadata by calling `Arrow.getmetadata(table)` or `Arrow.getmetadata(column)`.
The Arrow format allows data producers to [attach custom metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata) to various Arrow objects.

Arrow.jl provides a convenient accessor for this metadata via [`Arrow.getmetadata`](@ref). `Arrow.getmetadata(t::Arrow.Table)` will return an immutable `AbstractDict{String,String}` that represents the [`custom_metadata` of the table's associated `Schema`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L515) (or `nothing` if no such metadata exists), while `Arrow.getmetadata(c::Arrow.ArrowVector)` will return a similar representation of [the column's associated `Field` `custom_metadata`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L480) (or `nothing` if no such metadata exists).

To attach custom schema/column metadata to Arrow tables at serialization time, see the `metadata` and `colmetadata` keyword arguments to [`Arrow.write`](@ref).

## Writing arrow data

@@ -41,6 +41,7 @@ See docs for official Arrow.jl API with the [User Manual](@ref) and reference do
"""
module Arrow

using Base.Iterators
using Mmap
import Dates
using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers
@@ -106,7 +107,6 @@ function __init__()
CodecLz4.TranscodingStreams.initialize(lz4)
push!(LZ4_FRAME_COMPRESSOR, lz4)
end
OBJ_METADATA_LOCK[] = ReentrantLock()
return
end

@@ -1,7 +1,7 @@
name = "ArrowTypes"
uuid = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
authors = ["quinnj <quinn.jacobd@gmail.com>"]
version = "1.2.1"
version = "1.2.2"


[deps]
@@ -366,13 +366,6 @@ const JULIA_TO_ARROW_TYPE_MAPPING = Dict{Type, Tuple{String, Type}}()

istyperegistered(::Type{T}) where {T} = haskey(JULIA_TO_ARROW_TYPE_MAPPING, T)

function getarrowtype!(meta, ::Type{T}) where {T}
arrowname, arrowtype = JULIA_TO_ARROW_TYPE_MAPPING[T]
meta["ARROW:extension:name"] = arrowname
meta["ARROW:extension:metadata"] = ""
return arrowtype
end

const ARROW_TO_JULIA_TYPE_MAPPING = Dict{String, Tuple{Type, Type}}()

function extensiontype(f, meta)
@@ -27,11 +27,13 @@ or the `JULIA_NUM_THREADS` environment variable is set).
Supported keyword arguments to `Arrow.append` include:
* `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization
* `colmetadata=nothing`: the metadata that should be written as the table's columns' `custom_metadata` fields; must either be `nothing` or an `AbstractDict` of `column_name::Symbol => column_metadata` where `column_metadata` is an iterable of `<:AbstractString` pairs.
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)`
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html)
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
* `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed
* `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures
* `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as the table's schema's `custom_metadata` field; must either be `nothing` or an iterable of `<:AbstractString` pairs.
* `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1`
* `convert::Bool`: whether certain arrow primitive types in the schema of `file` should be converted to Julia defaults for matching them to the schema of `tbl`; by default, `convert=true`.
* `file::Bool`: applicable when an `IO` is provided, whether it is a file; by default `file=false`.
@@ -49,6 +51,8 @@ function append(file::String, tbl; kwargs...)
end

function append(io::IO, tbl;
metadata=getmetadata(tbl),
colmetadata=nothing,
largelists::Bool=false,
denseunions::Bool=true,
dictencode::Bool=false,
@@ -76,12 +80,12 @@ function append(io::IO, tbl;
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end

append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)
append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)

return io
end

function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)
function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta)
seekend(io)
skip(io, -8) # overwrite last 8 bytes of last empty message footer

@@ -113,9 +117,9 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
end

if threaded
Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
else
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
end
end
if anyerror[]
@@ -62,19 +62,32 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict
end
S = maybemissing(eltype(x))
if ArrowTypes.hasarrowname(T)
meta = meta === nothing ? Dict{String, String}() : meta
meta["ARROW:extension:name"] = String(ArrowTypes.arrowname(T))
meta["ARROW:extension:metadata"] = String(ArrowTypes.arrowmetadata(T))
meta = _arrowtypemeta(_normalizemeta(meta), String(ArrowTypes.arrowname(T)), String(ArrowTypes.arrowmetadata(T)))
end
return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, maxdepth=maxdepth, kw...)
end

_normalizemeta(::Nothing) = nothing
_normalizemeta(meta) = toidict(String(k) => String(v) for (k, v) in meta)

function _arrowtypemeta(::Nothing, n, m)
return toidict(("ARROW:extension:name" => n, "ARROW:extension:metadata" => m))
end

function _arrowtypemeta(meta, n, m)
dict = Dict(meta)
dict["ARROW:extension:name"] = n
dict["ARROW:extension:metadata"] = m
return toidict(dict)
end

# now we check for ArrowType converions and dispatch on ArrowKind
function arrowvector(::Type{S}, x, i, nl, fi, de, ded, meta; kw...) where {S}
meta = _normalizemeta(meta)
# deprecated and will be removed
if ArrowTypes.istyperegistered(S)
meta = meta === nothing ? Dict{String, String}() : meta
arrowtype = ArrowTypes.getarrowtype!(meta, S)
arrowname, arrowtype = ArrowTypes.JULIA_TO_ARROW_TYPE_MAPPING[S]
meta = _arrowtypemeta(meta, arrowname, "")
if arrowtype === S
return arrowvector(ArrowKind(S), x, i, nl, fi, de, ded, meta; kw...)
else
@@ -87,12 +100,12 @@ end

struct NullVector{T} <: ArrowVector{T}
data::MissingVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end
Base.size(v::NullVector) = (length(v.data),)
Base.getindex(v::NullVector{T}, i::Int) where {T} = ArrowTypes.fromarrow(T, getindex(v.data, i))

arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), meta)
arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), isnothing(meta) ? nothing : toidict(meta))
compress(Z::Meta.CompressionType, comp, v::NullVector) =
Compressed{Z, NullVector}(v, CompressedBuffer[], length(v), length(v), Compressed[])

@@ -25,7 +25,7 @@ struct BoolVector{T} <: ArrowVector{T}
pos::Int
validity::ValidityBitmap
::Int64
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

Base.size(p::BoolVector) = (p.ℓ,)
@@ -35,7 +35,7 @@ mutable struct DictEncoding{T, S, A} <: ArrowVector{T}
id::Int64
data::A
isOrdered::Bool
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

indextype(::Type{DictEncoding{T, S, A}}) where {T, S, A} = S
@@ -91,7 +91,7 @@ struct DictEncoded{T, S, A} <: ArrowVector{T}
validity::ValidityBitmap
indices::Vector{S}
encoding::DictEncoding{T, S, A}
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, S, A}, meta) where {S, T, A} =
@@ -229,7 +229,7 @@ function arrowvector(::DictEncodedKind, x, i, nl, fi, de, ded, meta; dictencode:
end
end
if meta !== nothing && getmetadata(encoding) !== nothing
merge!(meta, getmetadata(encoding))
meta = toidict(merge!(Dict(meta), Dict(getmetadata(encoding))))
elseif getmetadata(encoding) !== nothing
meta = getmetadata(encoding)
end
@@ -24,7 +24,7 @@ struct FixedSizeList{T, A <: AbstractVector} <: ArrowVector{T}
validity::ValidityBitmap
data::A
::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::FixedSizeList) = (l.ℓ,)
@@ -39,7 +39,7 @@ struct List{T, O, A} <: ArrowVector{T}
offsets::Offsets{O}
data::A
::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::List) = (l.ℓ,)
@@ -24,7 +24,7 @@ struct Map{T, O, A} <: ArrowVector{T}
offsets::Offsets{O}
data::A
::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::Map) = (l.ℓ,)
@@ -24,7 +24,7 @@ struct Primitive{T, A} <: ArrowVector{T}
validity::ValidityBitmap
data::A
::Int64
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Primitive(::Type{T}, b::Vector{UInt8}, v::ValidityBitmap, data::A, l, meta) where {T, A} =
@@ -23,7 +23,7 @@ struct Struct{T, S} <: ArrowVector{T}
validity::ValidityBitmap
data::S # Tuple of ArrowVector
::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::Struct) = (s.ℓ,)
@@ -63,7 +63,7 @@ struct DenseUnion{T, U, S} <: ArrowVector{T}
typeIds::Vector{UInt8}
offsets::Vector{Int32}
data::S # Tuple of ArrowVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::DenseUnion) = size(s.typeIds)
@@ -185,7 +185,7 @@ struct SparseUnion{T, U, S} <: ArrowVector{T}
arrow::Vector{UInt8} # need to hold a reference to arrow memory blob
typeIds::Vector{UInt8}
data::S # Tuple of ArrowVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::SparseUnion) = size(s.typeIds)
@@ -39,7 +39,7 @@ function juliaeltype(f::Meta.Field, ::Nothing, convert::Bool)
return convert ? finaljuliatype(T) : T
end

function juliaeltype(f::Meta.Field, meta::Dict{String, String}, convert::Bool)
function juliaeltype(f::Meta.Field, meta::AbstractDict{String, String}, convert::Bool)
TT = juliaeltype(f, convert)
!convert && return TT
T = finaljuliatype(TT)
@@ -182,19 +182,37 @@ struct Table <: Tables.AbstractColumns
columns::Vector{AbstractVector}
lookup::Dict{Symbol, AbstractVector}
schema::Ref{Meta.Schema}
metadata::Ref{Dict{String, String}}
metadata::Ref{Union{Nothing,Base.ImmutableDict{String,String}}}
end

Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Dict{String, String}}())
Table(names, types, columns, lookup, schema) = Table(names, types, columns, lookup, schema, Ref{Dict{String, String}}())
Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(nothing))

function Table(names, types, columns, lookup, schema)
m = isassigned(schema) ? buildmetadata(schema[]) : nothing
return Table(names, types, columns, lookup, schema, Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(m))
end

names(t::Table) = getfield(t, :names)
types(t::Table) = getfield(t, :types)
columns(t::Table) = getfield(t, :columns)
lookup(t::Table) = getfield(t, :lookup)
schema(t::Table) = getfield(t, :schema)
getmetadata(t::Table) = isdefined(getfield(t, :metadata), :x) ? getfield(t, :metadata)[] : nothing
setmetadata!(t::Table, m::Dict{String, String}) = (setindex!(getfield(t, :metadata), m); nothing)

"""
Arrow.getmetadata(x)
If `x isa Arrow.Table` return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Schema` `custom_metadata`, or `nothing` if no such metadata exists.
If `x isa Arrow.ArrowVector`, return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Field` `custom_metadata`, or `nothing` if no such metadata exists.
Otherwise, return `nothing`.
See [the official Arrow documentation for more details on custom application metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata).
"""
getmetadata(t::Table) = getfield(t, :metadata)[]
getmetadata(::Any) = nothing

Tables.istable(::Table) = true
Tables.columnaccess(::Table) = true
@@ -306,10 +324,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
lu[nm] = col
push!(ty, eltype(col))
end
meta = sch !== nothing ? sch.custom_metadata : nothing
if meta !== nothing
getfield(t, :metadata)[] = buildmetadata(meta)
end
getfield(t, :metadata)[] = buildmetadata(sch)
return t
end

@@ -366,10 +381,10 @@ struct VectorIterator
convert::Bool
end

buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(f::Union{Meta.Field,Meta.Schema}) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = toidict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(::Nothing) = nothing
buildmetadata(x::Dict{String, String}) = x
buildmetadata(x::AbstractDict) = x

function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
columnidx > length(x.schema.fields) && return nothing
@@ -207,3 +207,14 @@ function tobuffer(data; kwargs...)
seekstart(io)
return io
end

toidict(x::Base.ImmutableDict) = x

# ref https://github.com/JuliaData/Arrow.jl/pull/238#issuecomment-919415809
function toidict(pairs)
dict = Base.ImmutableDict(first(pairs))
for pair in Iterators.drop(pairs, 1)
dict = Base.ImmutableDict(dict, pair)
end
return dict
end

0 comments on commit 967e3c6

Please sign in to comment.