The DataStreams.jl
package aims to define a generic and performant framework for the transfer of "table-like" data. (i.e. data that can, in some sense, be described by rows and columns).
The framework achieves this by defining a system of Data.Source
types and methods to describe how they "provide" data; as well as Data.Sink
types and methods around how they "receive" data. This allows Data.Source
s and Data.Sink
s to implement their interfaces separately, without needing to be aware of each other. The end result is an ecosystem of packages that "automatically" talk with each other, with adding an additional package requiring no additional machinery in an existing packages.
The Data.Source
interface requires the following definitions, where MyPkg
would represent a package wishing to implement the framework:
Data.schema(::MyPkg.Source) => Data.Schema
; get theData.Schema
of aData.Source
. Typically theSource
type will store theData.Schema
directly, but this isn't strictly required. See?Data.Schema
or docs below for more information onData.Schema
Data.isdone(::MyPkg.Source, row, col) => Bool
; indicates whether theData.Source
will be able to provide data, given arow
andcol
.
Optional definition:
Data.reference(::MyPkg.Source) => Vector{UInt8}
; Sometimes, aSource
needs theSink
to keep a reference to memory to keep a data structure valid. ASource
can implement this method to return aVector{UInt8}
that theSink
will need to handle appropriately.
A Data.Source
also needs to "register" the type (or types) of streaming it supports. Currently defined streaming types in the DataStreams framework include:
Data.Field
: a field is the intersection of a specific row and column; this type of streaming will traverse the "table" structure by row, accessing each column on each rowData.Column
: this type of streaming will provide entire columns at a time
A Data.Source
"registers" to support field-based streaming by defining the following:
Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Field}) = true
; declares thatMyPkg.Source
supports field-based streamingData.getfield{T}(::MyPkg.Source, ::Type{T}, row, col) => Nullable{T}
; returns a value of typeNullable{T}
given a specificrow
andcol
fromMyPkg.Source
And for column-based streaming:
Data.streamtype(::Type{MyPkg.Source}, ::Type{Data.Column}) = true
Data.getcolumn{T}(::Data.Source, ::Type{T}, col) => AbstractVector{T}
Similar to a Data.Source
, a Data.Sink
needs to "register" the types of streaming it supports, it does so through the following definition:
Data.streamtypes(::Type{MyPkg.Sink}) = [Data.Field[, Data.Column]]
; "registers" the streaming preferences forMyPkg.Sink
. ASink
type should list the stream type or types it supports. If theSink
supports streaming of multiple types, it should list them in order of preference (i.e. the more natural or performant type first).
A Data.Sink
should also implement specific forms of constructors that allow convenience in many higher-level streaming functions:
MyPkg.Sink{T <: Data.StreamType}(source, ::Type{T}, append::Bool, args...)
; given an instance of aData.Source
, the type of streamingT
, whether the user desires to appendsource
or not, and any necessaryargs...
, construct an appropriate instance ofMyPkg.Sink
ready to receive data fromsource
.MyPkg.Sink{T <: Data.StreamType}(sink, source, ::Type{T}, append::Bool)
; similar to above, but instead of constructing a newSink
, an existingSink
is given as a first argument, which may be modified before being returned, ready to receive data fromsource
.
And finally, a Data.Sink
needs to implement the meat of the framework, the actual streaming method. For a Sink
supporting field-based streaming, the following method should be defined:
Data.stream!(source, ::Type{Data.Field}, sink::MyPkg.Sink, append::Bool)
; given a genericData.Source
, continue streaming data untilData.isdone(source, row, col) == true
. The streaming method should usually checkData.isdone(source, 1, 1) && return sink
before starting the actual streaming to account for a potentially emptyData.Source
.
And for column-based streaming:
Data.stream!(source, ::Type{Data.Column}, sink::MyPkg.Sink, append::Bool)