Skip to content

Commit

Permalink
merged with master
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaodaigh committed May 22, 2020
2 parents c855cb1 + 29308e3 commit bcd9a5c
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 4 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ Schema:
}
```

Create cursor to iterate over batches of column values. Each iteration returns a named tuple of column names with batch of column values. One batch corresponds to one row group of the parquet file.

```julia
julia> cc = Parquet.BatchedColumnsCursor(par)
Batched Columns Cursor on customer.impala.parquet
rows: 1:150000
batches: 1
cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment

julia> batchvals, state = iterate(cc);

julia> propertynames(batchvals)
(:c_custkey, :c_name, :c_address, :c_nationkey, :c_phone, :c_acctbal, :c_mktsegment, :c_comment)

julia> length(batchvals.c_name)
150000

julia> batchvals.c_name[1:5]
5-element Array{Union{Missing, String},1}:
"Customer#000000001"
"Customer#000000002"
"Customer#000000003"
"Customer#000000004"
"Customer#000000005"
```

Create cursor to iterate over records. In parallel mode, multiple remote cursors can be created and iterated on in parallel.

```julia
Expand Down
3 changes: 1 addition & 2 deletions src/Parquet.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import Thrift: isfilled
export is_par_file, ParFile, show, nrows, ncols, rowgroups, columns, pages, bytes, values, colname, colnames
export schema
export logical_timestamp, logical_string

export RecordCursor
export RecordCursor, BatchedColumnsCursor
export write_parquet

# package code goes here
Expand Down
61 changes: 61 additions & 0 deletions src/cursor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,67 @@ function Base.iterate(cursor::ColCursor)
return r
end

##

mutable struct BatchedColumnsCursor{T}
par::ParFile
colnames::Vector{Vector{String}}
colcursors::Vector{ColCursor}
colstates::Vector{Tuple{Int64,Int64}}
rowgroupid::Int
row::Int64
end

function BatchedColumnsCursor(par::ParFile)
sch = schema(par)

# supports only non nested columns as of now
if !all(num_children(schemaelem) == 0 for schemaelem in sch.schema[2:end])
error("nested schemas are not supported with BatchedColumnsCursor yet")
end

colcursors = [ColCursor(par, colname) for colname in colnames(par)]
rectype = ntcolstype(sch, sch.schema[1])
BatchedColumnsCursor{rectype}(par, colnames(par), colcursors, Array{Tuple{Int64,Int64}}(undef, length(colcursors)), 1, 1)
end

eltype(cursor::BatchedColumnsCursor{T}) where {T} = T
length(cursor::BatchedColumnsCursor) = length(rowgroups(cursor.par))

function colcursor_values(colcursor::ColCursor)
defn_levels = colcursor.defn_levels
vals = colcursor.vals

logical_converter_fn = colcursor.logical_converter_fn

if !isempty(defn_levels) && !all(x===Int32(1) for x in defn_levels)
[(defn_levels[idx] === Int32(1)) ? logical_converter_fn(vals[idx]) : missing for idx in 1:length(vals)]
else
(logical_converter_fn === identity) ? vals : map(logical_converter_fn, vals)
end
end

function Base.iterate(cursor::BatchedColumnsCursor{T}, rowgroupid) where {T}
(rowgroupid > length(cursor)) && (return nothing)

colcursors = cursor.colcursors
for colcursor in colcursors
setrow(colcursor, cursor.row)
end
colvals = [colcursor_values(colcursor) for colcursor in colcursors]

cursor.row += (rowgroups(cursor.par)[cursor.rowgroupid]).num_rows
cursor.rowgroupid += 1
T(colvals), cursor.rowgroupid
end

function Base.iterate(cursor::BatchedColumnsCursor{T}) where {T}
cursor.row = 1
cursor.colstates = [_start(colcursor) for colcursor in cursor.colcursors]
iterate(cursor, cursor.rowgroupid)
end


##

mutable struct RecordCursor{T}
Expand Down
14 changes: 14 additions & 0 deletions src/schema.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ function elemtype(schelem::SchemaElement)
jtype
end

ntcolstype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntcolstype(sch, sch.name_lookup[schname])
end
function ntcolstype(sch::Schema, schelem::SchemaElement)
@assert num_children(schelem) > 0
idx = findfirst(x->x===schelem, sch.schema)
children_range = (idx+1):(idx+schelem.num_children)
names = [Symbol(x.name) for x in sch.schema[children_range]]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[children_range]]
optionals = [isoptional(x) for x in sch.schema[children_range]]
types = [Vector{opt ? Union{t,Missing} : t} for (t,opt) in zip(types, optionals)]
NamedTuple{(names...,),Tuple{types...}}
end

ntelemtype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntelemtype(sch, sch.name_lookup[schname])
end
Expand Down
11 changes: 11 additions & 0 deletions src/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ function show(io::IO, cursor::RecordCursor)
println(io, " cols: $(join(colpaths, ", "))")
end

function show(io::IO, cursor::BatchedColumnsCursor)
par = cursor.par
rows = cursor.colcursors[1].row.rows
println(io, "Batched Columns Cursor on $(par.path)")
println(io, " rows: $rows")
println(io, " batches: $(length(cursor))")

colpaths = [join(colname, '.') for colname in cursor.colnames]
println(io, " cols: $(join(colpaths, ", "))")
end

function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildren::Vector{Int}=Int[])
print(io, indent)
lchildren = length(nchildren)
Expand Down
27 changes: 25 additions & 2 deletions test/test_cursors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ function test_row_cursor(file::String)
@info("loaded", file, count=nr, last_record=rec, time_to_read=time()-t1)
end

function test_batchedcols_cursor(file::String)
p = ParFile(file)

t1 = time()
nr = nrows(p)
cnames = colnames(p)
cc = BatchedColumnsCursor(p)
batch = nothing
for i in cc
batch = i
end
@info("loaded", file, count=nr, ncols=length(propertynames(batch)), time_to_read=time()-t1)
end

function test_col_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
Expand All @@ -45,13 +59,22 @@ function test_col_cursor_all_files()
end
end

function test_juliabuilder_row_cursor_all_files()
function test_row_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
test_row_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
end
end
end

function test_batchedcols_cursor_all_files()
for encformat in ("SNAPPY", "GZIP", "NONE")
for fname in ("nation", "customer")
test_batchedcols_cursor(joinpath(@__DIR__, "parquet-compatibility", "parquet-testdata", "impala", "1.1.1-$encformat/$fname.impala.parquet"))
end
end
end

#test_col_cursor_all_files()
test_juliabuilder_row_cursor_all_files()
test_row_cursor_all_files()
test_batchedcols_cursor_all_files()

0 comments on commit bcd9a5c

Please sign in to comment.