Skip to content
Permalink
Browse files
Support custom metadata for schema and columns (#20)
Closes #13. This PR adds two new functions: `Arrow.getmetadata(x)` and
`Arrow.setmetadata!(x, ::Dict{String, String})`, which allows, rather
obviously, setting metadata for an arbitrary object and then retrieving
that metadata. By utilizing these functions, users can get/set custom
metadata that is serialized in the arrow format at the schema level and
field (column) level. More specifically, to set arrow schema custom
metadata, a user would call `Arrow.setmetadata!(tbl, meta)` on their
table object `tbl`. To retrive arrow schema custom metadata, one can
call `tbl = Arrow.Table(...); meta = Arrow.getmetadata(tbl)`. Similarly
for column/field-level metadata, one can call `Arrow.setmetadata!(col,
colmeta)` to cause custom metadata to be serialized in the arrow
message, and call `Arrow.getmetadata(tbl.colX)` to retrive custom
metadata for a specific column in an `Arrow.Table`.

Note that technically the arrow `Message` and `Footer` objects also
allow setting custom metadata, but those are not addressed at all in
this PR since they seem to be less useful/urgent.
  • Loading branch information
quinnj committed Oct 3, 2020
1 parent 6828761 commit 035ac292fd21e5949e1b2e380bc8c3c85dd9f89f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
@@ -145,6 +145,10 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
end
lu[k] = col
end
meta = sch.custom_metadata
if meta !== nothing
setmetadata!(t, Dict(String(kv.key) => String(kv.value) for kv in meta))
end
return t
end

@@ -212,6 +216,10 @@ function Base.iterate(x::VectorIterator{debug}, (columnidx, nodeidx, bufferidx)=
debug && println("parsing column=$columnidx, T=$(x.types[columnidx]), len=$(x.batch.msg.header.nodes[nodeidx].length)")
A, nodeidx, bufferidx = build(x.types[columnidx], field, x.batch, x.batch.msg.header, nodeidx, bufferidx, debug)
end
meta = field.custom_metadata
if meta !== nothing
setmetadata!(A, Dict(String(kv.key) => String(kv.value) for kv in meta))
end
return A, (columnidx + 1, nodeidx, bufferidx)
end

@@ -1,3 +1,12 @@
const OBJ_METADATA = IdDict{Any, Dict{String, String}}()

function setmetadata!(x, meta::Dict{String, String})
OBJ_METADATA[x] = meta
return
end

getmetadata(x, default=nothing) = get(OBJ_METADATA, x, default)

"""
Arrow.write(io::IO, tbl)
Arrow.write(file::String, tbl)
@@ -11,7 +20,7 @@ Multiple record batches will be written based on the number of
`Tables.partitions(tbl)` that are provided; by default, this is just
one for a given table, but some table sources support automatic
partitioning. Note you can turn multiple table objects into partitions
by doing `Tables.partitioner([tbl1, tbl2, ...])`, but do remember that
by doing `Tables.partitioner([tbl1, tbl2, ...])`, but note that
each table must have the exact same `Tables.Schema`.
"""
function write end
@@ -243,18 +252,24 @@ end
struct ToArrowTable
sch::Tables.Schema
cols::Vector{Any}
metadata::Union{Nothing, Dict{String, String}}
fieldmetadata::Dict{Int, Dict{String, String}}
end

function toarrowtable(x)
cols = Tables.columns(x)
meta = getmetadata(cols)
sch = Tables.schema(cols)
types = collect(sch.types)
N = length(types)
newcols = Vector{Any}(undef, N)
newtypes = Vector{Type}(undef, N)
fieldmetadata = Dict{Int, Dict{String, String}}()
Tables.eachcolumn(sch, cols) do col, i, nm
colmeta = getmetadata(col)
if colmeta !== nothing
fieldmetadata[i] = colmeta
end
dictencode = false
if col isa AbstractArray && DataAPI.refarray(col) !== col
dictencode = true
@@ -264,7 +279,7 @@ function toarrowtable(x)
newtypes[i] = T
newcols[i] = dictencode ? DictEncode(newcol) : newcol
end
return ToArrowTable(Tables.Schema(sch.names, newtypes), newcols, fieldmetadata)
return ToArrowTable(Tables.Schema(sch.names, newtypes), newcols, meta, fieldmetadata)
end

toarrow(::Type{T}, i, col, fm) where {T} = T, col
@@ -381,11 +396,30 @@ function makeschema(b, sch::Tables.Schema{names, types}, columns, dictencodings)
FlatBuffers.prependoffset!(b, off)
end
fields = FlatBuffers.endvector!(b, N)
if columns.metadata !== nothing
kvs = columns.metadata
kvoffs = Vector{FlatBuffers.UOffsetT}(undef, length(kvs))
for (i, (k, v)) in enumerate(kvs)
koff = FlatBuffers.createstring!(b, String(k))
voff = FlatBuffers.createstring!(b, String(v))
Meta.keyValueStart(b)
Meta.keyValueAddKey(b, koff)
Meta.keyValueAddValue(b, voff)
kvoffs[i] = Meta.keyValueEnd(b)
end
Meta.schemaStartCustomMetadataVector(b, length(kvs))
for off in Iterators.reverse(kvoffs)
FlatBuffers.prependoffset!(b, off)
end
meta = FlatBuffers.endvector!(b, length(kvs))
else
meta = FlatBuffers.UOffsetT(0)
end
# write schema object
Meta.schemaStart(b)
Meta.schemaAddEndianness(b, Meta.Endianness.Little)
Meta.schemaAddFields(b, fields)
# Meta.schemaAddCustomMetadata(b, meta)
Meta.schemaAddCustomMetadata(b, meta)
return Meta.schemaEnd(b)
end

@@ -557,7 +557,6 @@ function Base.isequal(df::DataFile, tbl::Arrow.Table)
i = 1
for (col1, col2) in zip(Tables.Columns(df), Tables.Columns(tbl))
if isequal(col1, col2)
@show i
return false
end
i += 1
@@ -204,5 +204,19 @@ tt = Arrow.Table(io)
@test length(tt) == length(t)
@test all(isequal.(values(t), values(tt)))

t = (col1=Int64[1,2,3,4,5,6,7,8,9,10],)
meta = Dict("key1" => "value1", "key2" => "value2")
Arrow.setmetadata!(t, meta)
meta2 = Dict("colkey1" => "colvalue1", "colkey2" => "colvalue2")
Arrow.setmetadata!(t.col1, meta2)
io = IOBuffer()
Arrow.write(io, t)
seekstart(io)
tt = Arrow.Table(io)
@test length(tt) == length(t)
@test tt.col1 == t.col1
@test eltype(tt.col1) === Int64
@test Arrow.getmetadata(tt) == meta
@test Arrow.getmetadata(tt.col1) == meta2

end

0 comments on commit 035ac29

Please sign in to comment.