Skip to content
This repository has been archived by the owner on Jul 17, 2019. It is now read-only.

Commit

Permalink
0.5 release prep
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Oct 3, 2016
1 parent a1640ba commit 53521cb
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ os:
- osx

julia:
- 0.4
- 0.5
- nightly

Expand Down
2 changes: 1 addition & 1 deletion REQUIRE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
julia 0.4
julia 0.5
DataFrames
NullableArrays 0.0.9
CategoricalArrays 0.0.5
Expand Down
40 changes: 21 additions & 19 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ Packages can have a single julia type implement both the `Data.Source` and `Data
`Data.Source` implementations:
* [`CSV.Source`](https://github.com/JuliaData/CSV.jl/blob/master/src/Source.jl)
* [`SQLite.Source`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Source.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L164)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L251)
* [`ODBC.Source`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Source.jl)

`Data.Sink` implementations:
* [`CSV.Sink`](https://github.com/JuliaData/CSV.jl/blob/master/src/Sink.jl)
* [`SQLite.Sink`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Sink.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L182)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L287)
* [`ODBC.Sink`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Sink.jl)

## `Data.Source` Interface
Expand All @@ -28,6 +28,7 @@ The `Data.Source` interface requires the following definitions, where `MyPkg` wo
Optional definition:

* `Data.reference(::MyPkg.Source) => Vector{UInt8}`; Sometimes, a `Source` needs the `Sink` to keep a reference to memory to keep a data structure valid. A `Source` can implement this method to return a `Vector{UInt8}` that the `Sink` will need to handle appropriately.
* `Base.size(::MyPkg.Source[, i]) => Int`; not explicitly required to enable data-streaming, but a `Source` should generally be able to describe its first 2 dimensions, i.e. # of rows and columns.

A `Data.Source` also needs to "register" the type (or types) of streaming it supports. Currently defined streaming types in the DataStreams framework include:

Expand All @@ -37,42 +38,43 @@ A `Data.Source` also needs to "register" the type (or types) of streaming it sup
A `Data.Source` formally supports **field-based** streaming by defining the following:

* `Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Field}) = true`; declares that `MyPkg.Source` supports field-based streaming
* `Data.getfield{T}(::MyPkg.Source, ::Type{Nullable{T}}, row, col) => Nullable{T}`; returns a value of type `Nullable{T}` given a specific `row` and `col` from `MyPkg.Source`
* `Data.getfield{T}(::MyPkg.Source, ::Type{T}, row, col) => T`; returns a value of type `T` given a specific `row` and `col` from `MyPkg.Source`
* `Data.streamfrom{T}(::MyPkg.Source, ::Type{Data.Field}, ::Type{Nullable{T}}, row, col) => Nullable{T}`; returns a value of type `Nullable{T}` given a specific `row` and `col` from `MyPkg.Source`
* `Data.streamfrom{T}(::MyPkg.Source, ::Type{Data.Field}, ::Type{T}, row, col) => T`; returns a value of type `T` given a specific `row` and `col` from `MyPkg.Source`

And for column-based streaming:

* `Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Column}) = true`
* `Data.getcolumn{T}(::Data.Source, ::Type{T}, col) => Vector{T}`; Given a type `T`, returns column # `col` of a `Data.Source` as a `Vector{T}`
* `Data.getcolumn{T}(::Data.Source, ::Type{Nullable{T}}, col) => NullableVector{T}`; Given a type `Nullable{T}`, returns column # `col` of a `Data.Source` as a `NullableVector{T}`
* `Data.streamfrom{T}(::Data.Source, ::Type{Data.Column}, ::Type{T}, col) => Vector{T}`; Given a type `T`, returns column # `col` of a `Data.Source` as a `Vector{T}`
* `Data.streamfrom{T}(::Data.Source, ::Type{Data.Column}, ::Type{Nullable{T}}, col) => NullableVector{T}`; Given a type `Nullable{T}`, returns column # `col` of a `Data.Source` as a `NullableVector{T}`

## `Data.Sink` Interface

Similar to a `Data.Source`, a `Data.Sink` needs to "register" the types of streaming it supports, it does so through the following definition:

* `Data.streamtypes(::Type{MyPkg.Sink}) = [Data.Field[, Data.Column]]`; "registers" the streaming preferences for `MyPkg.Sink`. A `Sink` type should list the stream type or types it supports. If the `Sink` supports streaming of multiple types, it should list them in order of preference (i.e. the more natural or performant type first).

A `Data.Sink` should also implement specific forms of constructors that allow convenience in many higher-level streaming functions:
A `Data.Sink` needs to also implement specific forms of constructors that ensure proper Sink state in many higher-level streaming functions:

* `MyPkg.Sink{T <: Data.StreamType}(source, ::Type{T}, append::Bool, args...)`; given an instance of a `Data.Source`, the type of streaming `T`, whether the user desires to append `source` or not, and any necessary `args...`, construct an appropriate instance of `MyPkg.Sink` ready to receive data from `source`. The `append` argument allows an already existing sink file/source to "reset" itself if the user does not desire to append.
* `MyPkg.Sink{T <: Data.StreamType}(sink, source, ::Type{T}, append::Bool)`; similar to above, but instead of constructing a new `Sink`, an existing `Sink` is given as a first argument, which may be modified before being returned, ready to receive data from `source`.
* `MyPkg.Sink{T <: Data.StreamType}(schema::Data.Schema, ::Type{T}, append::Bool, ref::Vector{UInt8}, args...; kwargs...)`; given a `schema::Data.Schema` a source will be providing, the type of streaming `T` (`Field` or `Column`), whether the user desires to append the data or not, a possible memory reference `ref` and any necessary `args...` and `kwargs...`, construct an appropriate instance of `MyPkg.Sink` ready to receive data from `source`. The `append` argument allows an already existing sink file/source to "reset" itself if the user does not desire to append.
* `MyPkg.Sink{T <: Data.StreamType}(sink, schema::Data.Schema, ::Type{T}, append::Bool, ref::Vector{UInt8})`; similar to above, but instead of constructing a new `Sink`, an existing `Sink` is given as a first argument, which may be modified before being returned, ready to receive data according to the `Data.Source` `schema`.

Similar to `Data.Source`, a `Data.Sink` also needs to implement it's own `stream` method that indicates how it receives data.
Similar to `Data.Source`, a `Data.Sink` also needs to implement it's own `streamto!` method that indicates how it receives data.

A `Data.Sink` supports **field-based** streaming by optionally defining:
A `Data.Sink` supports **field-based** streaming by defining:

* OPTIONAL: `Data.open!(sink::MyPkg.Sink, source)`: typically, any necessary `Data.Sink` setup should be accomplished in it's own constructors (`MyPkg.Sink()` methods), but there are also cases tied specifically to the streaming process where certain actions need to be taken right before streaming begins. If a `Data.Sink` needs to perform this kind of action, it can overload `Data.open!(sink::MyPkg.Sink, source)`
* OPTIONAL: `Data.cleanup!(sink::MyPkg.Sink)`: certain `Data.Sink`, like databases, may need to protect against inconvenient or dangerous "states" if there happens to be an error while streaming. `Data.cleanup!` provides the sink a way to rollback a transaction or other kind of cleanup if an error occurs during streaming
* OPTIONAL: `Data.flush!(sink::MyPkg.Sink)`: similar to `Data.open!`, a `Data.Sink` may wish to perform certain actions once the streaming of a single `Data.Source` has finished. Note however, that a `Data.Sink` should still be ready to receive more data after a call to `Data.stream!` has finished. Only once a call to `Data.close!(sink)` has been made should a sink fully commit/close resources for good.
* OPTIONAL: `Data.close!(sink::MyPkg.Sink)`: as noted above, `Data.close!` is defined to allow a sink to fully commit all streaming results and close/destroy any necessary resources.
* `Data.streamto!{T}(sink::MyPkg.Sink, ::Type{Data.Field}, val::T, row, col[, schema])`: Given a `row`, `col`, and `val::T` a `Data.Sink` should store the value appropriately. The type of the value retrieved is given by `T`, which may be `Nullable{T}`. Optionally provided is the `schema` (the same `schema` that is passed in the `MyPkg.Sink(schema, ...)` constructors). This argument is passed for efficiency since
it can be calculated once at the beginning of a `Data.stream!` and used quickly for many calls to `Data.streamto!`. This argument is optional, because a Sink can overload `Data.streamto!` with or without it.

The only method that is absolutely required is:
A `Data.Sink` supports **column-based** streaming by defining:

* REQUIRED: `Data.streamfield!{T}(sink::MyPkg.Sink, source, ::Type{T}, row, col, cols[, sinkrows])`: Given a `row` and `col`, a `Data.Sink` should first call `Data.getfield(source, T, row, col)` to get the `Data.Source` value for that `row` and `col`, and then store the value appropriately. The type of the value retrieved is given by `T`, which may be `Nullable{T}`. Also provided are the total number of columns `cols` as well as the number of rows a sink began with as `sinkrows`. These arguments are passed for efficiency since they can be calculated once at the beginning of a `Data.stream!` and used quickly for many calls to `Data.streamfield!`.
* `Data.streamto!{T}(sink::MyPkg.Sink, ::Type{Data.Column}, column::Type{T}, row, col[, schema])`: Given a column number `col` and column of data `column`, a `Data.Sink` should store it appropriately. The type of the column is given by `T`, which may be a `NullableVector{T}`. Optionally provided is the `schema` (the same `schema` that is passed in the `MyPkg.Sink(schema, ...)` constructors). This argument is passed for efficiency since it can be calculated once at the beginning of a `Data.stream!` and used quickly for many calls to `Data.streamto!`. This argument is optional, because a Sink can overload `Data.streamto!` with or without it.

A `Data.Sink` supports **column-based** streaming by defining:

* `Data.streamcolumn!{T}(sink::MyPkg.Sink, source, ::Type{T}, col, row) => # of rows streamed`: Given a column number `col`, a `Data.Sink` should first call `Data.getcolumn(source, T, col)` to receive the column of data from the `Data.Source` before storing it appropriately. The type of the column is given by `T`. Also provided is the number of rows `row` that have been streamed so far. This method should return the # of rows that were present in the column streamed from `Data.Source` so that the total # of streamed rows can be tracked accurately.
A `Data.Sink` can optionally define the following if needed:

* `Data.cleanup!(sink::MyPkg.Sink)`: certain `Data.Sink`, like databases, may need to protect against inconvenient or dangerous "states" if there happens to be an error while streaming. `Data.cleanup!` provides the sink a way to rollback a transaction or other kind of cleanup if an error occurs during streaming
* `Data.close!(sink::MyPkg.Sink)`: during the `Data.stream!` workflow, a `Data.Sink` should remain "open" to receiving data until `Data.close!` is call explicitly. `Data.close!` is defined to allow a sink to fully commit all streaming results and close/destroy any necessary resources.


## `Data.Schema`

Expand Down
7 changes: 3 additions & 4 deletions src/DataStreams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ transform(sch::Data.Schema, transforms::Dict{String,Function}) = transform(sch,
abstract Source

# Required methods
# size(source)
# size(source, i)
function schema end
function isdone end
"""
Expand Down Expand Up @@ -217,7 +215,8 @@ end

function streamto!{T, TT}(sink, ::Type{Data.Column}, source, ::Type{T}, ::Type{TT}, row, col, sch, f)
column = f(Data.streamfrom(source, Data.Column, T, col)::T)::TT
return streamto!(sink, Data.Column, column, row, col, sch)
streamto!(sink, Data.Column, column, row, col, sch)
return length(column)
end

function Data.stream!{T1, T2}(source::T1, ::Type{Data.Column}, sink::T2, source_schema, sink_schema, transforms)
Expand Down Expand Up @@ -328,7 +327,7 @@ function Data.streamto!{T}(sink::DataFrame, ::Type{Data.Column}, column::T, row,
else
append!(sink.columns[col]::T, column)
end
return length(column)
return nothing
end

function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}})
Expand Down

0 comments on commit 53521cb

Please sign in to comment.