Skip to content
This repository has been archived by the owner on Jul 17, 2019. It is now read-only.

Commit

Permalink
Project 'Full Nullability' update
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Aug 22, 2016
1 parent 19f2c94 commit 50a20a4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 86 deletions.
22 changes: 15 additions & 7 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The `DataStreams.jl` package aims to define a generic and performant framework for the transfer of "table-like" data. (i.e. data that can, in some sense, be described by rows and columns).

The framework achieves this by defining a system of `Data.Source` types and methods to describe how they "provide" data; as well as `Data.Sink` types and methods around how they "receive" data. This allows `Data.Source`s and `Data.Sink`s to implement their interfaces separately, without needing to be aware of each other. The end result is an ecosystem of packages that "automatically" talk with each other, with adding an additional package requiring no additional machinery in an existing packages.
The framework achieves this by defining a system of `Data.Source` types and methods to describe how they "provide" data; as well as `Data.Sink` types and methods around how they "receive" data. This allows `Data.Source`s and `Data.Sink`s to implement their interfaces separately, without needing to be aware of each other. The end result is an ecosystem of packages that "automatically" talk with each other, with adding an additional package requiring no additional machinery in existing packages.

## `Data.Source` Interface

Expand All @@ -20,15 +20,17 @@ A `Data.Source` also needs to "register" the type (or types) of streaming it sup
* `Data.Field`: a field is the intersection of a specific row and column; this type of streaming will traverse the "table" structure by row, accessing each column on each row
* `Data.Column`: this type of streaming will provide entire columns at a time

A `Data.Source` "registers" to support field-based streaming by defining the following:
A `Data.Source` formally supports field-based streaming by defining the following:

* `Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Field}) = true`; declares that `MyPkg.Source` supports field-based streaming
* `Data.getfield{T}(::MyPkg.Source, ::Type{T}, row, col) => Nullable{T}`; returns a value of type `Nullable{T}` given a specific `row` and `col` from `MyPkg.Source`
* `Data.getfield{T}(::MyPkg.Source, ::Type{Nullable{T}}, row, col) => Nullable{T}`; returns a value of type `Nullable{T}` given a specific `row` and `col` from `MyPkg.Source`
* `Data.getfield{T}(::MyPkg.Source, ::Type{T}, row, col) => T`; returns a value of type `T` given a specific `row` and `col` from `MyPkg.Source`

And for column-based streaming:

* `Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Column}) = true`
* `Data.getcolumn{T}(::Data.Source, ::Type{T}, col) => AbstractVector{T}`
* `Data.getcolumn{T}(::Data.Source, ::Type{T}, col) => Vector{T}`; Given a type `T`, returns column # `col` of a `Data.Source` as a `Vector{T}`
* `Data.getcolumn{T}(::Data.Source, ::Type{Nullable{T}}, col) => NullableVector{T}`; Given a type `Nullable{T}`, returns column # `col` of a `Data.Source` as a `NullableVector{T}`

## `Data.Sink` Interface

Expand All @@ -38,13 +40,19 @@ Similar to a `Data.Source`, a `Data.Sink` needs to "register" the types of strea

A `Data.Sink` should also implement specific forms of constructors that allow convenience in many higher-level streaming functions:

* `MyPkg.Sink{T <: Data.StreamType}(source, ::Type{T}, append::Bool, args...)`; given an instance of a `Data.Source`, the type of streaming `T`, whether the user desires to append `source` or not, and any necessary `args...`, construct an appropriate instance of `MyPkg.Sink` ready to receive data from `source`.
* `MyPkg.Sink{T <: Data.StreamType}(source, ::Type{T}, append::Bool, args...)`; given an instance of a `Data.Source`, the type of streaming `T`, whether the user desires to append `source` or not, and any necessary `args...`, construct an appropriate instance of `MyPkg.Sink` ready to receive data from `source`. The `append` argument allows an already existing sink file/source to "reset" itself if the user does not desire to append.
* `MyPkg.Sink{T <: Data.StreamType}(sink, source, ::Type{T}, append::Bool)`; similar to above, but instead of constructing a new `Sink`, an existing `Sink` is given as a first argument, which may be modified before being returned, ready to receive data from `source`.

And finally, a `Data.Sink` needs to implement the meat of the framework, the actual streaming method. For a `Sink` supporting field-based streaming, the following method should be defined:

* `Data.stream!(source, ::Type{Data.Field}, sink::MyPkg.Sink, append::Bool)`; given a generic `Data.Source`, continue streaming data until `Data.isdone(source, row, col) == true`. The streaming method should usually check `Data.isdone(source, 1, 1) && return sink` before starting the actual streaming to account for a potentially empty `Data.Source`.
* `Data.stream!(source, ::Type{Data.Field}, sink::MyPkg.Sink, append::Bool)`; given a generic `Data.Source` (`source`), continue streaming data until `Data.isdone(source, row, col) == true`. The streaming method should usually check `Data.isdone(source, 1, 1) && return sink` before starting the actual streaming to account for a potentially empty `Data.Source`. A `Data.stream!` method has access to the `Data.schema(source)` (including column types through `Data.types(schema)`), and should call the `Data.getfield` to receive data for each column in a row. Currently, most `Data.Source` types have an internal notion of state, so `Data.getfield` doesn't guarantee random access and should be called assuming the start of row 1, column 1, proceeding sequentially to column N, then to row 2, and so on. Care should also be taken to appropriately handle potential "missing" fields when the `Data.Schema` includes `Nullable{T}` types and the result of `Data.getfield` is `Nullable{T}`.

And for column-based streaming:

* `Data.stream!(source, ::Type{Data.Column}, sink::MyPkg.Sink, append::Bool)`
* `Data.stream!(source, ::Type{Data.Column}, sink::MyPkg.Sink, append::Bool)`; similar in many ways to field-streaming, column-streaming just means 1+ rows are received on each call to `Data.getcolumn`. It's best practice to not assume a fixed # of rows when streaming, but rather rely on `Data.isdone` to ensure all data has been exhausted from the `source`. As with `Data.getfield`, care should be taken to account for `Data.getcolumn` to return either a `Vector{T}` or `NullableVector{T}`, depending on the types in `Data.schema(source)`.

## `Data.Schema`

```@docs
Data.Schema
```
133 changes: 54 additions & 79 deletions src/DataStreams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,6 @@ if !isdefined(Core, :String)
typealias String UTF8String
end

"""
A `Data.Source` type represents data that can be read/queried/parsed/viewed/streamed; i.e. a "true data source"
To clarify, there are two distinct types of "source":
1) the "true data source", which would be the file, database, API, structure, etc; i.e. the actual data
2) the `Source` julia object that wraps the "true source" and implements the `Data.Source` interface
`Source` types have two different types of constructors:
1) "independent constructors" that wrap "true data sources"
2) "sink constructors" where a `Data.Sink` object that has received data is turned into a new `Source` (useful for chaining data processing tasks)
`Source`s also have a, currently implicit, notion of state:
* `BEGINNING`: a `Source` is in this state immediately after being constructed and is ready to be used; i.e. ready to read/parse/query/stream data from it
* `READING`: the ingestion of data from this `Source` has started and has not finished yet
* `DONE`: the ingestion process has exhausted all data expected from this `Source`
The `Data.Source` interface requires the following definitions:
* `Data.schema(::Data.Source) => Data.Schema`; typically the `Source` type will store the `Data.Schema` directly, but this isn't strictly required
* `Data.isdone(::Data.Source, row, col) => Bool`;
* `Data.streamtype(::Type{Data.Source}, ::Type{Data.StreamType}) => Bool`;
* `Data.getfield{T}(::Data.Source, ::Type{T}, row, col) => T`;
* `Data.getcolumn{T}(::Data.Source, ::Type{T}, col) => AbstractVector{T}`;
Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true
Data.getfield{T}(source::DataFrame, ::Type{T}, row, col) = (@inbounds v = source[row, col]::T; return v)
Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true
Data.getcolumn{T}(source::DataFrame, ::Type{T}, col) = (@inbounds c = source.columns[col]; return c)
* `Data.schema(::Data.Source) => Data.Schema`; typically the `Source` type will store the `Data.Schema` directly, but this isn't strictly required
* `Data.reset!(::Data.Source)`; used to reset a `Source` type from `READING` or `DONE` to the `BEGINNING` state, ready to be read from again
* `isdone(::Data.Source, row, col)`; indicates whether the `Source` type is in the `DONE` state; i.e. all data has been exhausted from this source
"""
abstract Source

function reset! end
Expand Down Expand Up @@ -76,42 +39,22 @@ function streamtypes end
function getfield end
function getcolumn end

"""
A `Data.Sink` type represents a data destination; i.e. an "true data source" such as a database, file, API endpoint, etc.
There are two broad types of `Sink`s:
1) "new sinks": an independent `Sink` constructor creates a *new* "true data source" that can be streamed to
2) "existing sinks": the `Sink` wraps an already existing "true data source" (or `Source` object that wraps an "true data source").
Upon construction of these `Sink`s, there is no new creation of "true data source"s; the "ulitmate data source" is simply wrapped to replace or append to
`Sink`s also have notions of state:
* `BEGINNING`: the `Sink` is freshly constructed and ready to stream data to; this includes initial metadata like column headers
* `WRITING`: data has been streamed to the `Sink`, but is still open to receive more data
* `DONE`: the `Sink` has been closed and can no longer receive data
The `Data.Sink` interface includes the following:
* `Data.schema(::Data.Sink) => Data.Schema`; typically the `Sink` type will store the `Data.Schema` directly, but this isn't strictly required
"""
abstract Sink

"""
`Data.stream!(::Data.Source, ::Data.Sink)` starts transfering data from a newly constructed `Source` type to a newly constructed `Sink` type.
Data transfer typically continues until `Data.isdone(source, row, col) == true`, i.e. the `Source` is exhausted, at which point the `Sink` is closed and may
no longer receive data. See individual `Data.stream!` methods for more details on specific `Source`/`Sink` combinations.
"""
function stream!#(::Source, ::Sink)
function stream!
end

"""
A `Data.Schema` describes a tabular dataset (i.e. a set of optionally named, typed columns with records as rows)
Access to `Data.Schema` fields includes:
`Data.Schema` allow `Data.Source` and `Data.Sink` to talk to each other and prepare to provide/receive data through streaming.
`Data.Schema` fields include:
* `Data.header(schema)` to return the header/column names in a `Data.Schema`
* `Data.types(schema)` to return the column types in a `Data.Schema`
* `Data.types(schema)` to return the column types in a `Data.Schema`; `Nullable{T}` indicates columns that may contain missing data (null values)
* `Data.size(schema)` to return the (# of rows, # of columns) in a `Data.Schema`
`Data.Source` and `Data.Sink` interfaces both require that `Data.schema(source_or_sink)` be defined to ensure
that other `Data.Source`/`Data.Sink` can work appropriately.
"""
type Schema
header::Vector{String} # column names
Expand Down Expand Up @@ -148,15 +91,15 @@ function Base.show(io::IO, schema::Schema)
end
end

"Returns the `Data.Schema` for `io`"
schema(io) = io.schema # by default, we assume the `Source`/`Sink` stores the schema directly
"Returns the `Data.Schema` for `source_or_sink`"
schema(source_or_sink) = source_or_sink.schema # by default, we assume the `Source`/`Sink` stores the schema directly
"Returns the header/column names (if any) associated with a specific `Source` or `Sink`"
header(io) = header(schema(io))
header(source_or_sink) = header(schema(source_or_sink))
"Returns the column types associated with a specific `Source` or `Sink`"
types(io) = types(schema(io))
types(source_or_sink) = types(schema(source_or_sink))
"Returns the (# of rows,# of columns) associated with a specific `Source` or `Sink`"
Base.size(io::Source) = size(schema(io))
Base.size(io::Source, i) = size(schema(io),i)
Base.size(source_or_sink::Source) = size(schema(source_or_sink))
Base.size(source_or_sink::Source, i) = size(schema(source_or_sink),i)
setrows!(source, rows) = isdefined(source, :schema) ? (source.schema.rows = rows; nothing) : nothing
setcols!(source, cols) = isdefined(source, :schema) ? (source.schema.cols = cols; nothing) : nothing

Expand Down Expand Up @@ -191,7 +134,7 @@ using DataFrames, NullableArrays, WeakRefStrings

function Data.schema(df::DataFrame)
return Data.Schema(map(string, names(df)),
DataType[eltype(A) <: Nullable ? eltype(eltype(A)) : eltype(A) for A in df.columns], size(df, 1))
DataType[eltype(A) for A in df.columns], size(df, 1))
end

# DataFrame as a Data.Source
Expand All @@ -201,22 +144,41 @@ function Data.isdone(source::DataFrame, row, col)
end

Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true
Data.getfield{T}(source::DataFrame, ::Type{T}, row, col) = (@inbounds v = source[row, col]; return v)

Data.getfield{T}(A::Vector{T}, ::Type{T}, row) = (@inbounds v = A[row]::T; return v)
Data.getfield{T}(A::Vector{T}, ::Type{Nullable{T}}, row) = (@inbounds v = A[row]::T; return Nullable{T}(v, false))

Data.getfield{T}(A::NullableVector{T}, ::Type{T}, row) = (@inbounds v = A[row]::Nullable{T}; return get(v))
Data.getfield{T}(A::NullableVector{T}, ::Type{Nullable{T}}, row) = (@inbounds v = A[row]::Nullable{T}; return v)

Data.getfield{T}(source::DataFrame, ::Type{T}, row, col) = (@inbounds A = source.columns[col]; return Data.getfield(A, T, row))

Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true
Data.getcolumn{T}(source::DataFrame, ::Type{T}, col) = (@inbounds c = source.columns[col]; return c)

Data.getcolumn{T}(A::Vector{T}, ::Type{T}) = A
Data.getcolumn{T}(A::Vector{T}, ::Type{Nullable{T}}) = NullableArray(A, falses(length(A)))

Data.getcolumn{T}(A::NullableVector{T}, ::Type{T}) = sum(A.isnull) == 0 ? A.values : throw(NullException)
Data.getcolumn{T}(A::NullableVector{T}, ::Type{Nullable{T}}) = A

Data.getcolumn{T}(source::DataFrame, ::Type{T}, col) = (@inbounds A = source.columns[col]; return Data.getcolumn(A, T))

# DataFrame as a Data.Sink
DataFrame{T<:Data.StreamType}(so, ::Type{T}, append, args...) = DataFrame(Data.schema(so), T, Data.reference(so))

allocate{T}(::Type{T}, rows, ref) = Array{T}(rows)
function allocate{T}(::Type{Nullable{T}}, rows, ref)
A = Array{T}(rows)
return NullableArray{T, 1}(A, fill(true, rows), isempty(ref) ? UInt8[] : ref)
end

function DataFrame{T<:Data.StreamType}(sch::Schema, ::Type{T}=Data.Field, ref=UInt8[])
rows, cols = size(sch)
rows = T === Data.Column || rows < 0 ? 0 : rows # don't pre-allocate for Column streaming
columns = Vector{Any}(cols)
types = Data.types(sch)
for i = 1:cols
typ = types[i]
A = Array{typ}(rows)
columns[i] = NullableArray{typ,1}(A, fill(true, rows), isempty(ref) ? UInt8[] : ref)
columns[i] = allocate(types[i], rows, ref)
end
return DataFrame(columns, map(Symbol, Data.header(sch)))
end
Expand Down Expand Up @@ -247,12 +209,20 @@ end

Data.streamtypes(::Type{DataFrame}) = [Data.Column, Data.Field]

function pushfield!{T}(source, dest::NullableVector{T}, row, col)
function pushfield!{T}(source, dest::Vector{T}, row, col)
push!(dest, Data.getfield(source, T, row, col))
return
end
function pushfield!{T}(source, dest::NullableVector{T}, row, col)
push!(dest, Data.getfield(source, Nullable{T}, row, col))
return
end

function getfield!{T}(source, dest::NullableVector{T}, row, col, sinkrow)
@inbounds dest[sinkrow] = Data.getfield(source, Nullable{T}, row, col)
return
end
function getfield!{T}(source, dest::Vector{T}, row, col, sinkrow)
@inbounds dest[sinkrow] = Data.getfield(source, T, row, col)
return
end
Expand Down Expand Up @@ -284,15 +254,20 @@ function Data.stream!{T}(source::T, ::Type{Data.Field}, sink::DataFrame, append:
return sink
end

function pushcolumn!{T}(source, dest::NullableVector{T}, col)
function pushcolumn!{T}(source, dest::Vector{T}, col)
column = Data.getcolumn(source, T, col)
append!(dest, column)
return length(dest)
end
function pushcolumn!{T}(source, dest::NullableVector{T}, col)
column = Data.getcolumn(source, Nullable{T}, col)
append!(dest.values, column.values)
append!(dest.isnull, column.isnull)
return length(dest)
end

function pushcolumn!{T}(source, dest::NullableVector{WeakRefString{T}}, col)
column = Data.getcolumn(source, WeakRefString{T}, col)
column = Data.getcolumn(source, Nullable{WeakRefString{T}}, col)
offset = length(dest.values)
parentoffset = length(dest.parent)
append!(dest.isnull, column.isnull)
Expand Down

0 comments on commit 50a20a4

Please sign in to comment.