Skip to content
Permalink
Browse files
fix version mismatch by changing footer to V5 (#321)
  • Loading branch information
pcjentsch committed May 4, 2022
1 parent bd872f4 commit ed95f233e05e3059435d93e7b264573c3b8a1235
Showing 1 changed file with 9 additions and 9 deletions.
@@ -125,8 +125,8 @@ mutable struct Writer{T<:IO}
msgs::OrderedChannel{Message}
schema::Ref{Tables.Schema}
firstcols::Ref{Any}
dictencodings::Dict{Int64, Any}
blocks::NTuple{2, Vector{Block}}
dictencodings::Dict{Int64,Any}
blocks::NTuple{2,Vector{Block}}
task::Task
anyerror::Threads.Atomic{Bool}
errorref::Ref{Any}
@@ -141,7 +141,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,LZ4FrameCompre
msgs = OrderedChannel{Message}(ntasks)
schema = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
dictencodings = Dict{Int64,Any}() # Lockable{DictEncoding}
blocks = (Block[], Block[])
# start message writing from channel
threaded = ntasks > 1
@@ -168,7 +168,7 @@ function Base.open(::Type{Writer}, io::IO, compress::Symbol, args...)
open(Writer, io, compressor, args...)
end

function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool = false)
function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool=false)
open(Writer, io, compress, file, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata, closeio)
end

@@ -177,7 +177,7 @@ Base.open(::Type{Writer}, file_path; kwargs...) = open(Writer, open(file_path, "
function check_errors(writer::Writer)
if writer.anyerror[]
errorref = writer.errorref[]
@error "error writing arrow data on partition = $(errorref[3])" exception=(errorref[1], errorref[2])
@error "error writing arrow data on partition = $(errorref[3])" exception = (errorref[1], errorref[2])
error("fatal error writing arrow data")
end
end
@@ -198,7 +198,7 @@ function write(writer::Writer, source)
writer.firstcols[] = cols
put!(writer.msgs, makeschemamsg(writer.schema[], cols), writer.partition_count)
if !isempty(writer.dictencodings)
des = sort!(collect(writer.dictencodings); by=x->x.first, rev=true)
des = sort!(collect(writer.dictencodings); by=x -> x.first, rev=true)
for (id, delock) in des
# assign dict encoding ids
de = delock.x
@@ -264,7 +264,7 @@ function Base.close(writer::Writer)
FlatBuffers.UOffsetT(0)
end
Meta.footerStart(b)
Meta.footerAddVersion(b, Meta.MetadataVersions.V4)
Meta.footerAddVersion(b, Meta.MetadataVersions.V5)
Meta.footerAddSchema(b, schfoot)
Meta.footerAddDictionaries(b, dicts)
Meta.footerAddRecordBatches(b, recordbatches)
@@ -500,13 +500,13 @@ struct Buffer
length::Int64
end

function makerecordbatchmsg(sch::Tables.Schema{names, types}, columns, alignment) where {names, types}
function makerecordbatchmsg(sch::Tables.Schema{names,types}, columns, alignment) where {names,types}
b = FlatBuffers.Builder(1024)
recordbatch, bodylen = makerecordbatch(b, sch, columns, alignment)
return makemessage(b, Meta.RecordBatch, recordbatch, columns, bodylen)
end

function makerecordbatch(b, sch::Tables.Schema{names, types}, columns, alignment) where {names, types}
function makerecordbatch(b, sch::Tables.Schema{names,types}, columns, alignment) where {names,types}
nrows = Tables.rowcount(columns)

compress = nothing

0 comments on commit ed95f23

Please sign in to comment.