Merge pull request #51 from JuliaData/jq/rows
Make DataStreams row happy
quinnj committed Nov 21, 2017
2 parents be1a9dd + 7201c3f commit c8a690c
Showing 3 changed files with 312 additions and 153 deletions.
188 changes: 38 additions & 150 deletions src/DataStreams.jl
Expand Up @@ -66,6 +66,7 @@ Base.size(sch::Schema) = (sch.rows, sch.cols)
Base.size(sch::Schema, i::Int) = ifelse(i == 1, sch.rows, ifelse(i == 2, sch.cols, 0))

setrows!(source, rows) = isdefined(source, :schema) ? (source.schema.rows = rows; nothing) : nothing
setrows!(source::Array, rows) = nothing
Base.getindex(sch::Schema, col::String) = sch.index[col]

function, schema::Schema)
Expand All @@ -90,7 +91,8 @@ transform(sch::Data.Schema, transforms::Dict{String, <:Function}, s) = transform

# Data.StreamTypes
abstract type StreamType end
struct Field <: StreamType end
struct Field <: StreamType end
struct Row <: StreamType end
struct Column <: StreamType end

# Data.Source Interface
Expand Down Expand Up @@ -188,6 +190,7 @@ function streamfrom end
Data.streamfrom(source, ::Type{Data.Column}, T, row, col) = Data.streamfrom(source, Data.Column, T, col)

# Generic fallbacks
Data.streamtype(source, ::Type{Data.Row}) = Data.streamtype(source, Data.Field)
Data.streamtype(source, ::Type{<:StreamType}) = false
Data.reset!(source) = nothing

Expand Down Expand Up @@ -298,6 +301,12 @@ If, on the other hand, my sink also supported `Data.Column` streaming, and `Data
Data.streamtypes(::Type{MyPkg.Sink}) = [Data.Column, Data.Field] # put Data.Column first to indicate preference
A third option is a sink that operates on entire rows at a time, in which case I could define:
Data.streamtypes(::Type{MyPkg.Sink}) = [Data.Row]
The subsequent `Data.streamto!` method would then require the signature `Data.streamto!(sink::MyPkg.Sink, ::Type{Data.Row}, vals::NamedTuple, row, col, knownrows`
function streamtypes end

Expand All @@ -306,10 +315,10 @@ function streamtypes end
`Data.streamto!(sink, S::Type{StreamType}, val, row, col, knownrows)`
Streams data to a sink. `S` is the type of streaming (`Data.Field` or `Data.Column`). `val` is the value (single field or column)
Streams data to a sink. `S` is the type of streaming (`Data.Field`, `Data.Row`, or `Data.Column`). `val` is the value or values (single field, row as a NamedTuple, or column, respectively)
to be streamed to the sink. `row` and `col` indicate where the data should be streamed/stored.
A sink may optionally define the method that also accepts the `knownrows` argument, which will be `true` or `false`,
A sink may optionally define the method that also accepts the `knownrows` argument, which will be `Val{true}` or `Val{false}`,
indicating whether the source streaming has a known # of rows or not. This can be useful for sinks that
may know how to pre-allocate space in the cases where the source can tell the # of rows, or in the case
of unknown # of rows, may need to stream the data in differently.
Expand Down Expand Up @@ -409,6 +418,11 @@ Once the `source` is constructed, the data is streamed via the call to `Data.str
And finally, to "finish" the streaming process, `Data.close!(sink)` is closed, which returns the finalized sink. Note that `!(source, sink)` could be called multiple times with different sources and the same sink,
most likely with `append=true` being passed, to enable the accumulation of several sources into a single sink. A single `Data.close!(sink)` method should be called to officially close or commit the final sink.
Two "builtin" Source/Sink types that are included with the DataStreams package are the `Data.Table` and `Data.RowTable` types. `Data.Table` is a NamedTuple of AbstractVectors, with column names as NamedTuple fieldnames.
This type supports both `Data.Field` and `Data.Column` streaming. `Data.RowTable` is just a Vector of NamedTuples, and as such, only supports `Data.Field` streaming.
In addition, any `Data.Source` can be iterated via the `Data.rows(source)` function, which returns a NamedTuple-iterator over the rows of a source.
function stream! end

Expand Down Expand Up @@ -438,7 +452,7 @@ function!(source::So, ::Type{Si}, args...;
sourcerows = size(source_schema, 1)
sinkrows = size(sink_schema, 1)
sinkrowoffset = ifelse(append, ifelse(ismissing(sourcerows), sinkrows, max(0, sinkrows - sourcerows)), 0)
return!(source, sinkstreamtype, sink, source_schema, sinkrowoffset, transforms2, filter, columns)
return!(source, sinkstreamtype, sink, source_schema, sinkrowoffset, transforms2, filter, columns, Ref{Tuple(map(Symbol, Data.header(source_schema)))})
throw(ArgumentError("`source` doesn't support the supported streaming types of `sink`: $sinkstreamtypes"))
Expand All @@ -464,27 +478,28 @@ function!(source::So, sink::Si;
sourcerows = size(source_schema, 1)
sinkrows = size(sink_schema, 1)
sinkrowoffset = ifelse(append, ifelse(ismissing(sourcerows), sinkrows, max(0, sinkrows - sourcerows)), 0)
return!(source, sinkstreamtype, sink, source_schema, sinkrowoffset, transforms2, filter, columns)
return!(source, sinkstreamtype, sink, source_schema, sinkrowoffset, transforms2, filter, columns, Ref{Tuple(map(Symbol, Data.header(source_schema)))})
throw(ArgumentError("`source` doesn't support the supported streaming types of `sink`: $sinkstreamtypes"))

# column filtering
# Data.transforms needs to produce sink schema w/ correct #/types of columns
# if RandomAccess
# only generate @nexprs for selected column #s, need to pass in column offset or something to map between source co
# else
# generate normal # of @nexprs, but only need to include Data.streamto! for included columns

# row filtering
# needs to update source schema to be unknown # of rows? or maybe that gets set earlier?
# where func: (tuple) -> Bool or (tuple, row) -> Bool
# probably do @nexprs for streamfroms first, then apply where func
# if where = true, execute @nexprs streamto!, else continue

function inner_loop(::Type{Val{N}}, ::Type{S}, ::Type{Val{homogeneous}}, ::Type{T}, knownrows::Type{Val{R}}) where {N, S <: StreamType, homogeneous, T, R}
if N < 500
function inner_loop(::Type{Val{N}}, ::Type{S}, ::Type{Val{homogeneous}}, ::Type{T}, knownrows::Type{Val{R}}, names, sourcetypes) where {N, S <: StreamType, homogeneous, T, R}
if S == Data.Row
vals = Tuple(:($(Symbol("val_$col"))) for col = 1:N)
out = @static if isdefined(Core, :NamedTuple)
:(vals = NamedTuple{$names, $(Tuple{sourcetypes...})}(($(vals...),)))
exprs = [:($nm::$typ) for (nm, typ) in zip(names, sourcetypes)]
nt = NamedTuples.make_tuple(exprs)
:(vals = $nt($(vals...)))
loop = quote
$((:($(Symbol("val_$col")) = Data.streamfrom(source, Data.Field, sourcetypes[$col], row, $col)) for col = 1:N)...)
Data.streamto!(sink, Data.Row, vals, sinkrowoffset + row, 0, $knownrows)
elseif N < 500
# println("generating inner_loop w/ @nexprs...")
incr = S == Data.Column ? :(cur_row = length($(Symbol(string("val_", N))))) : :(nothing)
loop = quote
Expand Down Expand Up @@ -560,7 +575,7 @@ end

@generated function!(source::So, ::Type{S}, sink::Si,
source_schema::Schema{R, T1}, sinkrowoffset,
transforms, filter, columns) where {So, S <: StreamType, Si, R, T1}
transforms, filter, columns, ::Type{Ref{names}}) where {So, S <: StreamType, Si, R, T1, names}
types = T1.parameters
sourcetypes = Tuple(types)
homogeneous = Val{all(i -> (types[1] === i), types)}
Expand All @@ -574,7 +589,7 @@ end
sourcetypes = $sourcetypes
N = $N
$(generate_loop(knownrows, S, inner_loop(N, S, homogeneous, T, knownrows)))
$(generate_loop(knownrows, S, inner_loop(N, S, homogeneous, T, knownrows, names, sourcetypes)))
catch e
Expand All @@ -585,134 +600,7 @@ end
return r

# Source/Sink with NamedTuples

if isdefined(Core, :NamedTuple)

# Basically, a NamedTuple with any # of AbstractVector elements, accessed by column name
const Table = NamedTuple{names, T} where {names, T <: NTuple{N, AbstractVector{S} where S}} where {N}

# NamedTuple Data.Source implementation
# compute Data.Schema on the fly
function Data.schema(df::NamedTuple{names, T}) where {names, T}
return Data.Schema(Type[eltype(A) for A in T.parameters],
collect(map(string, names)), length(df) == 0 ? 0 : length(getfield(df, 1)))

else # if isdefined(Core, :NamedTuple)

using NamedTuples

# Constraint relaxed for compatability; NamedTuples.NamedTuple does not have parameters
const Table = NamedTuple

# NamedTuple Data.Source implementation
# compute Data.Schema on the fly
function Data.schema(df::NamedTuple)
return Data.Schema(Type[eltype(A) for A in values(df)],
collect(map(string, keys(df))), length(df) == 0 ? 0 : length(getfield(df, 1)))

end # if isdefined(Core, :NamedTuple)

Data.isdone(source::Table, row, col, rows, cols) = row > rows || col > cols
function Data.isdone(source::Table, row, col)
cols = length(source)
return Data.isdone(source, row, col, cols == 0 ? 0 : length(getfield(source, 1)), cols)

# We support both kinds of streaming
Data.streamtype(::Type{<:NamedTuple}, ::Type{Data.Column}) = true
Data.streamtype(::Type{<:NamedTuple}, ::Type{Data.Field}) = true

# Data.streamfrom is pretty simple, just return the cell or column
@inline Data.streamfrom(source::Table, ::Type{Data.Column}, ::Type{T}, row, col) where {T} = source[col]
@inline Data.streamfrom(source::Table, ::Type{Data.Field}, ::Type{T}, row, col) where {T} = source[col][row]

# NamedTuple Data.Sink implementation
# we support both kinds of streaming to our type
Data.streamtypes(::Type{<:NamedTuple}) = [Data.Column, Data.Field]
# we support streaming WeakRefStrings
Data.weakrefstrings(::Type{<:NamedTuple}) = true

# convenience methods for "allocating" a single column for streaming
allocate(::Type{T}, rows, ref) where {T} = Vector{T}(rows)
# allocate(::Type{T}, rows, ref) where {T <: Union{CategoricalValue, Missing}} =
# CategoricalArray{CategoricalArrays.unwrap_catvalue_type(T)}(rows)
# special case for WeakRefStrings
allocate(::Type{T}, rows, ref) where {T <: Union{WeakRefString,Missing}} = WeakRefStringArray(ref, T, rows)

# NamedTuple doesn't allow duplicate names, so make sure there are no duplicates in our column names
function makeunique(names::Vector{String})
nms = [Symbol(nm) for nm in names]
seen = Set{Symbol}()
for (i, x) in enumerate(nms)
x in seen ? setindex!(nms, Symbol("$(x)_$i"), i) : push!(seen, x)
return (nms...)

# Construct or modify a NamedTuple to be ready to stream data from a source with a schema of `sch`
# We support WeakRefString streaming, so we include the `reference` keyword
function NamedTuple(sch::Data.Schema{R}, ::Type{S}=Data.Field,
append::Bool=false, args...; reference::Vector{UInt8}=UInt8[]) where {R, S <: StreamType}
types = Data.types(sch)
# check if we're dealing with an existing NamedTuple sink or not
if !isempty(args) && args[1] isa Table && types == Data.types(Data.schema(args[1]))
# passing in an existing NamedTuple Sink w/ same types as source (as indicated by `sch`)
sink = args[1]
sinkrows = size(Data.schema(sink), 1)
if append && (S == Data.Column || !R) # are we appending and either column-streaming or there are an unknown # of rows
sch.rows = sinkrows
# dont' need to do anything because:
# for Data.Column, we just append columns anyway (see Data.streamto! below)
# for Data.Field, the # of rows in the source are unknown (ismissing(rows)), so we'll just push! in streamto!
# need to adjust the existing sink
# similar to above, for Data.Column or unknown # of rows for Data.Field, we'll append!/push!, so we empty! the columns
# if appending, we want to grow our columns to be able to include every row in source (sinkrows + sch.rows)
# if not appending, we're just "re-using" a sink, so we just resize it to the # of rows in the source
newsize = ifelse(S == Data.Column || !R, 0, ifelse(append, sinkrows + sch.rows, sch.rows))
foreach(col->resize!(col, newsize), sink)
sch.rows = newsize
# take care of a possible reference from source by letting WeakRefStringArrays hold on to them
if !isempty(reference)
foreach(col-> col isa WeakRefStringArray && push!(, reference), sink)
# allocating a fresh NamedTuple Sink; append is irrelevant
# for Data.Column or unknown # of rows in Data.Field, we only ever append!, so just allocate empty columns
rows = ifelse(S == Data.Column, 0, ifelse(!R, 0, sch.rows))
names = makeunique(Data.header(sch))

sink = @static if isdefined(Core, :NamedTuple)
NamedTuple{names}(Tuple(allocate(types[i], rows, reference) for i = 1:length(types)))
NamedTuples.make_tuple(collect(names))((allocate(types[i], rows, reference) for i = 1:length(types))...)
sch.rows = rows
return sink

# Constructor that takes an existing NamedTuple sink, just pass it to our mega-constructor above
function NamedTuple(sink::Table, sch::Data.Schema, ::Type{S}, append::Bool; reference::Vector{UInt8}=UInt8[]) where {S}
return NamedTuple(sch, S, append, sink; reference=reference)

# Data.streamto! is easy-peasy, if there are known # of rows from source, we pre-allocated
# so we can just set the value; otherwise (didn't pre-allocate), we push!/append! the values
@inline Data.streamto!(sink::Table, ::Type{Data.Field}, val, row, col::Int) =
(A = getfield(sink, col); row > length(A) ? push!(A, val) : setindex!(A, val, row))
@inline Data.streamto!(sink::Table, ::Type{Data.Field}, val, row, col::Int, ::Type{Val{false}}) =
push!(getfield(sink, col), val)
@inline Data.streamto!(sink::Table, ::Type{Data.Field}, val, row, col::Int, ::Type{Val{true}}) =
getfield(sink, col)[row] = val
@inline Data.streamto!(sink::Table, ::Type{Data.Column}, column, row, col::Int, knownrows) =
append!(getfield(sink, col), column)


end # module Data

