Skip to content

Commit

Permalink
Added a batched columns iterator
Browse files Browse the repository at this point in the history
Adds a way to iterate over data in a columnar fashion. Result on each iteration is a named tuple with column names and corresponding vector of data. It is currently restricted to reading non-nested schemas, but can be extended to support nested schemas later. The batch size is determined by the size of row group in the parquet file, it iterates over one row group worth of data at one time.

```julia
julia> cc = Parquet.BatchedColumnsCursor(par)
Batched Columns Cursor on customer.impala.parquet
    rows: 1:150000
    batches: 1
    cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment

julia> batchvals, state = iterate(cc);

julia> propertynames(batchvals)
(:c_custkey, :c_name, :c_address, :c_nationkey, :c_phone, :c_acctbal, :c_mktsegment, :c_comment)

julia> length(batchvals.c_name)
150000

julia> batchvals.c_name[1:5]
5-element Array{Union{Missing, String},1}:
 "Customer#000000001"
 "Customer#000000002"
 "Customer#000000003"
 "Customer#000000004"
 "Customer#000000005"
```
  • Loading branch information
tanmaykm committed May 22, 2020
1 parent 802a945 commit 1b1c5cf
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 3 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ Schema:
}
```

Create cursor to iterate over batches of column values. Each iteration returns a named tuple of column names with batch of column values. One batch corresponds to one row group of the parquet file.

```julia
julia> cc = Parquet.BatchedColumnsCursor(par)
Batched Columns Cursor on customer.impala.parquet
rows: 1:150000
batches: 1
cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment

julia> batchvals, state = iterate(cc);

julia> propertynames(batchvals)
(:c_custkey, :c_name, :c_address, :c_nationkey, :c_phone, :c_acctbal, :c_mktsegment, :c_comment)

julia> length(batchvals.c_name)
150000

julia> batchvals.c_name[1:5]
5-element Array{Union{Missing, String},1}:
"Customer#000000001"
"Customer#000000002"
"Customer#000000003"
"Customer#000000004"
"Customer#000000005"
```

Create cursor to iterate over records. In parallel mode, multiple remote cursors can be created and iterated on in parallel.

```julia
Expand Down
2 changes: 1 addition & 1 deletion src/Parquet.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Thrift: isfilled
export is_par_file, ParFile, show, nrows, ncols, rowgroups, columns, pages, bytes, values, colname, colnames
export schema
export logical_timestamp, logical_string
export RecordCursor
export RecordCursor, BatchedColumnsCursor

# package code goes here
include("PAR2/PAR2.jl")
Expand Down
61 changes: 61 additions & 0 deletions src/cursor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,67 @@ function Base.iterate(cursor::ColCursor)
return r
end

##

mutable struct BatchedColumnsCursor{T}
par::ParFile
colnames::Vector{Vector{String}}
colcursors::Vector{ColCursor}
colstates::Vector{Tuple{Int64,Int64}}
rowgroupid::Int
row::Int64
end

function BatchedColumnsCursor(par::ParFile)
sch = schema(par)

# supports only non nested columns as of now
if !all(num_children(schemaelem) == 0 for schemaelem in sch.schema[2:end])
error("nested schemas are not supported with BatchedColumnsCursor yet")
end

colcursors = [ColCursor(par, colname) for colname in colnames(par)]
rectype = ntcolstype(sch, sch.schema[1])
BatchedColumnsCursor{rectype}(par, colnames(par), colcursors, Array{Tuple{Int64,Int64}}(undef, length(colcursors)), 1, 1)
end

eltype(cursor::BatchedColumnsCursor{T}) where {T} = T
length(cursor::BatchedColumnsCursor) = length(rowgroups(cursor.par))

function colcursor_values(colcursor::ColCursor)
defn_levels = colcursor.defn_levels
vals = colcursor.vals

logical_converter_fn = colcursor.logical_converter_fn

if !isempty(defn_levels) && !all(x===Int32(1) for x in defn_levels)
[(defn_levels[idx] === Int32(1)) ? logical_converter_fn(vals[idx]) : missing for idx in 1:length(vals)]
else
(logical_converter_fn === identity) ? vals : map(logical_converter_fn, vals)
end
end

function Base.iterate(cursor::BatchedColumnsCursor{T}, rowgroupid) where {T}
(rowgroupid > length(cursor)) && (return nothing)

colcursors = cursor.colcursors
for colcursor in colcursors
setrow(colcursor, cursor.row)
end
colvals = [colcursor_values(colcursor) for colcursor in colcursors]

cursor.row += (rowgroups(cursor.par)[cursor.rowgroupid]).num_rows
cursor.rowgroupid += 1
T(colvals), cursor.rowgroupid
end

function Base.iterate(cursor::BatchedColumnsCursor{T}) where {T}
cursor.row = 1
cursor.colstates = [_start(colcursor) for colcursor in cursor.colcursors]
iterate(cursor, cursor.rowgroupid)
end


##

mutable struct RecordCursor{T}
Expand Down
14 changes: 14 additions & 0 deletions src/schema.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ function elemtype(schelem::SchemaElement)
jtype
end

ntcolstype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntcolstype(sch, sch.name_lookup[schname])
end
function ntcolstype(sch::Schema, schelem::SchemaElement)
@assert num_children(schelem) > 0
idx = findfirst(x->x===schelem, sch.schema)
children_range = (idx+1):(idx+schelem.num_children)
names = [Symbol(x.name) for x in sch.schema[children_range]]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[children_range]]
optionals = [isoptional(x) for x in sch.schema[children_range]]
types = [Vector{opt ? Union{t,Missing} : t} for (t,opt) in zip(types, optionals)]
NamedTuple{(names...,),Tuple{types...}}
end

ntelemtype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntelemtype(sch, sch.name_lookup[schname])
end
Expand Down
11 changes: 11 additions & 0 deletions src/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ function show(io::IO, cursor::RecordCursor)
println(io, " cols: $(join(colpaths, ", "))")
end

function show(io::IO, cursor::BatchedColumnsCursor)
par = cursor.par
rows = cursor.colcursors[1].row.rows
println(io, "Batched Columns Cursor on $(par.path)")
println(io, " rows: $rows")
println(io, " batches: $(length(cursor))")

colpaths = [join(colname, '.') for colname in cursor.colnames]
println(io, " cols: $(join(colpaths, ", "))")
end

function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildren::Vector{Int}=Int[])
print(io, indent)
lchildren = length(nchildren)
Expand Down
27 changes: 25 additions & 2 deletions test/test_cursors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ function test_row_cursor(file::String)
@info("loaded", file, count=nr, last_record=rec, time_to_read=time()-t1)
end

function test_batchedcols_cursor(file::String)
p = ParFile(file)

t1 = time()
nr = nrows(p)
cnames = colnames(p)
cc = BatchedColumnsCursor(p)
batch = nothing
for i in cc
batch = i
end
@info("loaded", file, count=nr, ncols=length(propertynames(batch)), time_to_read=time()-t1)
end

function test_col_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
Expand All @@ -45,13 +59,22 @@ function test_col_cursor_all_files()
end
end

function test_juliabuilder_row_cursor_all_files()
function test_row_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
test_row_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
end
end
end

function test_batchedcols_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
test_batchedcols_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
end
end
end

#test_col_cursor_all_files()
test_juliabuilder_row_cursor_all_files()
test_row_cursor_all_files()
test_batchedcols_cursor_all_files()

0 comments on commit 1b1c5cf

Please sign in to comment.