diff --git a/REQUIRE b/REQUIRE index 7bb9ed3..e7929e1 100644 --- a/REQUIRE +++ b/REQUIRE @@ -5,3 +5,5 @@ StatsBase 0.11.0 SortingAlgorithms Reexport Compat 0.19.0 +WeakRefStrings 0.1.3 +DataStreams 0.1.0 diff --git a/src/DataTables.jl b/src/DataTables.jl index e69a70b..3763f7d 100644 --- a/src/DataTables.jl +++ b/src/DataTables.jl @@ -17,7 +17,6 @@ import NullableArrays: dropnull, dropnull! @reexport using CategoricalArrays using GZip using SortingAlgorithms - using Base: Sort, Order import Base: ==, |> diff --git a/src/abstractdatatable/io.jl b/src/abstractdatatable/io.jl index 3c6ff81..9c6aa07 100644 --- a/src/abstractdatatable/io.jl +++ b/src/abstractdatatable/io.jl @@ -194,3 +194,140 @@ end @compat function Base.show(io::IO, ::MIME"text/tab-separated-values", dt::AbstractDataTable) printtable(io, dt, true, '\t') end + +############################################################################## +# +# DataStreams-based IO +# +############################################################################## + +importall DataStreams +using WeakRefStrings + +# DataTables DataStreams implementation +function Data.schema(df::DataTable, ::Type{Data.Column}) + return Data.Schema(map(string, names(df)), + DataType[typeof(A) for A in df.columns], size(df, 1)) +end + +# DataTable as a Data.Source +function Data.isdone(source::DataTable, row, col) + rows, cols = size(source) + return row > rows || col > cols +end + +Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true +Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true + +Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = + (@inbounds A = source.columns[col]::T; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = + (@inbounds A = source.columns[col]; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = + (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) + +# DataTable as a Data.Sink +allocate{T}(::Type{T}, rows, ref) = Array{T}(rows) +allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows) + +allocate{T}(::Type{Nullable{T}}, rows, ref) = + NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) +allocate{T}(::Type{NullableVector{T}}, rows, ref) = + NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) + +allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = + CategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = + CategoricalArray{S,1,R}(rows) + +allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = + NullableCategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = + NullableCategoricalArray{S,1,R}(rows) + +function DataTable{T <: Data.StreamType}(sch::Data.Schema, + ::Type{T}=Data.Field, + append::Bool=false, + ref::Vector{UInt8}=UInt8[], args...) + rows, cols = size(sch) + rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming + columns = Vector{Any}(cols) + types = Data.types(sch) + for i = 1:cols + columns[i] = allocate(types[i], rows, ref) + end + return DataTable(columns, map(Symbol, Data.header(sch))) +end + +# given an existing DataTable (`sink`), make any necessary changes for streaming source +# with Data.Schema `sch` to it, given we know if we'll be `appending` or not +function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool, + ref::Vector{UInt8}) + rows, cols = size(sch) + newsize = max(0, rows) + (append ? size(sink, 1) : 0) + # need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending + if append + for (i, T) in enumerate(Data.types(sch)) + if T <: Nullable{WeakRefString{UInt8}} + sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]]) + sch.types[i] = Nullable{String} + end + end + end + newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns) + sch.rows = newsize + return sink +end +function DataTable(sink, sch::Data.Schema, ::Type{Data.Column}, append::Bool, ref::Vector{UInt8}) + rows, cols = size(sch) + append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns) + return sink +end + +Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field] + +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::Vector{T}, val) +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::NullableVector{T}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::CategoricalVector{T, R}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::Vector{T})[row] = val +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::NullableVector{T})[row] = val +Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = + sink.columns[col][row] = val +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::CategoricalVector{T, R})[row] = val +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val + +function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema) + if row == 0 + sink.columns[col] = column + else + append!(sink.columns[col]::T, column) + end + return length(column) +end + +function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}}) + offset = length(dest.values) + parentoffset = length(dest.parent) + append!(dest.isnull, column.isnull) + append!(dest.parent, column.parent) + # appending new data to `dest` would invalid all existing WeakRefString pointers + resize!(dest.values, length(dest) + length(column)) + for i = 1:offset + old = dest.values[i] + dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind) + end + for i = 1:length(column) + old = column.values[i] + dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind) + end + return length(dest) +end diff --git a/test/REQUIRE b/test/REQUIRE index 644abb1..3f5122d 100644 --- a/test/REQUIRE +++ b/test/REQUIRE @@ -1,5 +1,4 @@ Compat 0.9.0 DataStructures -RData LaTeXStrings -Atom +CSV \ No newline at end of file diff --git a/test/io.jl b/test/io.jl index 336f881..6f042eb 100644 --- a/test/io.jl +++ b/test/io.jl @@ -51,4 +51,10 @@ module TestIO answer = Sys.WORD_SIZE == 64 ? 0x937e94e70d642cce : 0x2b8864d8 @test hash(sprint(printtable, dt)) == answer + + # DataStreams + using CSV + + dt = CSV.read(joinpath(dirname(@__FILE__), "data/iris.csv"), DataTable) + @test size(dt) == (150, 5) end