Skip to content

Commit

Permalink
store ncols in Table and TablePartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmaykm committed Mar 3, 2021
1 parent e6566b4 commit 842d94d
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/simple_reader.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ df = DataFrame(read_parquet(path; copycols=false))
"""
struct Table <: Tables.AbstractColumns
path::String
ncols::Int
rows::Union{Nothing,UnitRange}
batchsize::Union{Nothing,Signed}
use_threads::Bool
Expand All @@ -33,8 +34,9 @@ struct Table <: Tables.AbstractColumns
use_threads::Bool=(nthreads() > 1))
parfile = Parquet.File(path)
sch = tables_schema(parfile)
ncols = length(sch.names)
lookup = Dict{Symbol, Int}(nm => i for (i, nm) in enumerate(sch.names))
new(path, rows, batchsize, use_threads, parfile, sch, lookup, AbstractVector[])
new(path, ncols, rows, batchsize, use_threads, parfile, sch, lookup, AbstractVector[])
end
end

Expand All @@ -47,19 +49,18 @@ end

struct TablePartitions
table::Table
ncols::Int
cursor::BatchedColumnsCursor

function TablePartitions(table::Table)
new(table, cursor(table))
new(table, getfield(table, :ncols), cursor(table))
end
end
length(tp::TablePartitions) = length(tp.cursor)
function iterated_partition(partitions::TablePartitions, iterresult)
(iterresult === nothing) && (return nothing)
chunk, batchid = iterresult
sch = Tables.schema(getfield(partitions, :table))
ncols = length(sch.names)
TablePartition(partitions.table, AbstractVector[chunk[colidx] for colidx in 1:ncols]), batchid
TablePartition(partitions.table, AbstractVector[chunk[colidx] for colidx in 1:partitions.ncols]), batchid
end
Base.iterate(partitions::TablePartitions, batchid) = iterated_partition(partitions, iterate(partitions.cursor, batchid))
Base.iterate(partitions::TablePartitions) = iterated_partition(partitions, iterate(partitions.cursor))
Expand All @@ -75,8 +76,7 @@ loaded(table::Table) = !isempty(getfield(table, :columns))
load(table::Table) = load(table, cursor(table))
function load(table::Table, chunks::BatchedColumnsCursor)
chunks = [chunk for chunk in chunks]
sch = Tables.schema(table)
ncols = length(sch.names)
ncols = getfield(table, :ncols)
columns = getfield(table, :columns)

empty!(columns)
Expand Down

0 comments on commit 842d94d

Please sign in to comment.