From 67b241285ec69b4b1f956eb6e7dd76ed522679b9 Mon Sep 17 00:00:00 2001 From: Cameron Prybol Date: Thu, 16 Mar 2017 19:09:31 -0700 Subject: [PATCH 1/5] get DataStreams implementation working --- REQUIRE | 3 + src/DataTables.jl | 2 +- src/abstractdatatable/io.jl | 119 ++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/REQUIRE b/REQUIRE index 7bb9ed3..cbc7a45 100644 --- a/REQUIRE +++ b/REQUIRE @@ -5,3 +5,6 @@ StatsBase 0.11.0 SortingAlgorithms Reexport Compat 0.19.0 +WeakRefStrings +CSV +DataStreams diff --git a/src/DataTables.jl b/src/DataTables.jl index e69a70b..00a0d3e 100644 --- a/src/DataTables.jl +++ b/src/DataTables.jl @@ -17,7 +17,7 @@ import NullableArrays: dropnull, dropnull! @reexport using CategoricalArrays using GZip using SortingAlgorithms - +using WeakRefStrings using Base: Sort, Order import Base: ==, |> diff --git a/src/abstractdatatable/io.jl b/src/abstractdatatable/io.jl index 1c32f5c..f83cd1b 100644 --- a/src/abstractdatatable/io.jl +++ b/src/abstractdatatable/io.jl @@ -193,3 +193,122 @@ end @compat function Base.show(io::IO, ::MIME"text/tab-separated-values", dt::AbstractDataTable) printtable(io, dt, true, '\t') end + +############################################################################## +# +# CSV IO +# +############################################################################## + +importall DataStreams + +# DataTables DataStreams implementation +function Data.schema(df::DataTable, ::Type{Data.Column}) + return Data.Schema(map(string, names(df)), + DataType[typeof(A) for A in df.columns], size(df, 1)) +end + +# DataTable as a Data.Source +function Data.isdone(source::DataTable, row, col) + rows, cols = size(source) + return row > rows || col > cols +end + +Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true +Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true + +Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]::T; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) + +# DataTable as a Data.Sink +allocate{T}(::Type{T}, rows, ref) = Array{T}(rows) +allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows) + +allocate{T}(::Type{Nullable{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) +allocate{T}(::Type{NullableVector{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) + +allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows) + +allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows) + +if isdefined(Main, :DataArray) + allocate{T}(::Type{DataVector{T}}, rows, ref) = DataArray{T}(rows) +end + +function DataTable{T <: Data.StreamType}(sch::Data.Schema, ::Type{T}=Data.Field, append::Bool=false, ref::Vector{UInt8}=UInt8[], args...) + rows, cols = size(sch) + rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming + columns = Vector{Any}(cols) + types = Data.types(sch) + for i = 1:cols + columns[i] = allocate(types[i], rows, ref) + end + return DataTable(columns, map(Symbol, Data.header(sch))) +end + +# given an existing DataTable (`sink`), make any necessary changes for streaming source +# with Data.Schema `sch` to it, given we know if we'll be `appending` or not +function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool, ref::Vector{UInt8}) + rows, cols = size(sch) + newsize = max(0, rows) + (append ? size(sink, 1) : 0) + # need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending + if append + for (i, T) in enumerate(Data.types(sch)) + if T <: Nullable{WeakRefString{UInt8}} + sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]]) + sch.types[i] = Nullable{String} + end + end + end + newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns) + sch.rows = newsize + return sink +end +function DataTable(sink, sch::Data.Schema, ::Type{Data.Column}, append::Bool, ref::Vector{UInt8}) + rows, cols = size(sch) + append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns) + return sink +end + +Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field] + +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val) +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) + +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val +Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = (sink.columns[col][row] = val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val + +function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema) + if row == 0 + sink.columns[col] = column + else + append!(sink.columns[col]::T, column) + end + return length(column) +end + +function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}}) + offset = length(dest.values) + parentoffset = length(dest.parent) + append!(dest.isnull, column.isnull) + append!(dest.parent, column.parent) + # appending new data to `dest` would invalid all existing WeakRefString pointers + resize!(dest.values, length(dest) + length(column)) + for i = 1:offset + old = dest.values[i] + dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind) + end + for i = 1:length(column) + old = column.values[i] + dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind) + end + return length(dest) +end From 2f20ebb8c585fc18e63857f38cbfb18ac5d5e2da Mon Sep 17 00:00:00 2001 From: Cameron Prybol Date: Fri, 17 Mar 2017 13:24:35 -0700 Subject: [PATCH 2/5] melt long functions --- src/abstractdatatable/io.jl | 107 +++++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 26 deletions(-) diff --git a/src/abstractdatatable/io.jl b/src/abstractdatatable/io.jl index f83cd1b..f629de5 100644 --- a/src/abstractdatatable/io.jl +++ b/src/abstractdatatable/io.jl @@ -196,7 +196,7 @@ end ############################################################################## # -# CSV IO +# CSV/DataStreams-based IO # ############################################################################## @@ -217,28 +217,36 @@ end Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true -Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]::T; return A) -Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]; return A) -Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) +Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = ( + @inbounds A = source.columns[col]::T; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = ( + @inbounds A = source.columns[col]; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = ( + @inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) # DataTable as a Data.Sink allocate{T}(::Type{T}, rows, ref) = Array{T}(rows) allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows) -allocate{T}(::Type{Nullable{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) -allocate{T}(::Type{NullableVector{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) +allocate{T}(::Type{Nullable{T}}, rows, ref) = + NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) +allocate{T}(::Type{NullableVector{T}}, rows, ref) = + NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref) -allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows) -allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = + CategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = + CategoricalArray{S,1,R}(rows) -allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows) -allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = + NullableCategoricalArray{S,1,R}(rows) +allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = + NullableCategoricalArray{S,1,R}(rows) -if isdefined(Main, :DataArray) - allocate{T}(::Type{DataVector{T}}, rows, ref) = DataArray{T}(rows) -end - -function DataTable{T <: Data.StreamType}(sch::Data.Schema, ::Type{T}=Data.Field, append::Bool=false, ref::Vector{UInt8}=UInt8[], args...) +function DataTable{T <: Data.StreamType}(sch::Data.Schema, + ::Type{T}=Data.Field, + append::Bool=false, + ref::Vector{UInt8}=UInt8[], args...) rows, cols = size(sch) rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming columns = Vector{Any}(cols) @@ -251,14 +259,16 @@ end # given an existing DataTable (`sink`), make any necessary changes for streaming source # with Data.Schema `sch` to it, given we know if we'll be `appending` or not -function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool, ref::Vector{UInt8}) +function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool, + ref::Vector{UInt8}) rows, cols = size(sch) newsize = max(0, rows) + (append ? size(sink, 1) : 0) # need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending if append for (i, T) in enumerate(Data.types(sch)) if T <: Nullable{WeakRefString{UInt8}} - sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]]) + sink.columns[i] = NullableArray(String[string(get(x, "")) + for x in sink.columns[i]]) sch.types[i] = Nullable{String} end end @@ -275,16 +285,61 @@ end Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field] -Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val) -Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val) -Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val) -Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) +Data.streamto!{T}(sink::DataTable, + ::Type{Data.Field}, + val::T, + row, + col, + sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val) +Data.streamto!{T}(sink::DataTable, + ::Type{Data.Field}, + val::Nullable{T}, + row, + col, + sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val) +Data.streamto!{T, R}(sink::DataTable, + ::Type{Data.Field}, + val::CategoricalValue{T, R}, + row, + col, + sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val) +Data.streamto!{T, R}(sink::DataTable, + ::Type{Data.Field}, + val::Nullable{CategoricalValue{T, R}}, + row, + col, + sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) -Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val -Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val -Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = (sink.columns[col][row] = val) -Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val -Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val +Data.streamto!{T}(sink::DataTable, + ::Type{Data.Field}, + val::T, + row, + col, + sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val +Data.streamto!{T}(sink::DataTable, + ::Type{Data.Field}, + val::Nullable{T}, + row, + col, + sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val +Data.streamto!(sink::DataTable, + ::Type{Data.Field}, + val::Nullable{WeakRefString{UInt8}}, + row, + col, + sch::Data.Schema{true}) = sink.columns[col][row] = val +Data.streamto!{T, R}(sink::DataTable, + ::Type{Data.Field}, + val::CategoricalValue{T, R}, + row, + col, + sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val +Data.streamto!{T, R}(sink::DataTable, + ::Type{Data.Field}, + val::Nullable{CategoricalValue{T, R}}, + row, + col, + sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema) if row == 0 From 54e3c9e25a2b65c227040cc54f079613516acfe7 Mon Sep 17 00:00:00 2001 From: Cameron Prybol Date: Fri, 17 Mar 2017 13:35:28 -0700 Subject: [PATCH 3/5] spacing and change requirements and import location --- REQUIRE | 1 - src/DataTables.jl | 1 - src/abstractdatatable/io.jl | 3 ++- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/REQUIRE b/REQUIRE index cbc7a45..1404274 100644 --- a/REQUIRE +++ b/REQUIRE @@ -6,5 +6,4 @@ SortingAlgorithms Reexport Compat 0.19.0 WeakRefStrings -CSV DataStreams diff --git a/src/DataTables.jl b/src/DataTables.jl index 00a0d3e..3763f7d 100644 --- a/src/DataTables.jl +++ b/src/DataTables.jl @@ -17,7 +17,6 @@ import NullableArrays: dropnull, dropnull! @reexport using CategoricalArrays using GZip using SortingAlgorithms -using WeakRefStrings using Base: Sort, Order import Base: ==, |> diff --git a/src/abstractdatatable/io.jl b/src/abstractdatatable/io.jl index f629de5..e1fdcf7 100644 --- a/src/abstractdatatable/io.jl +++ b/src/abstractdatatable/io.jl @@ -201,11 +201,12 @@ end ############################################################################## importall DataStreams +using WeakRefStrings # DataTables DataStreams implementation function Data.schema(df::DataTable, ::Type{Data.Column}) return Data.Schema(map(string, names(df)), - DataType[typeof(A) for A in df.columns], size(df, 1)) + DataType[typeof(A) for A in df.columns], size(df, 1)) end # DataTable as a Data.Source From 807f23ef04467d846c2856ceed68c632fe1b2f74 Mon Sep 17 00:00:00 2001 From: Cameron Prybol Date: Fri, 17 Mar 2017 14:35:15 -0700 Subject: [PATCH 4/5] changes --- src/abstractdatatable/io.jl | 92 +++++++++++-------------------------- 1 file changed, 27 insertions(+), 65 deletions(-) diff --git a/src/abstractdatatable/io.jl b/src/abstractdatatable/io.jl index e1fdcf7..a8ef611 100644 --- a/src/abstractdatatable/io.jl +++ b/src/abstractdatatable/io.jl @@ -196,7 +196,7 @@ end ############################################################################## # -# CSV/DataStreams-based IO +# DataStreams-based IO # ############################################################################## @@ -206,7 +206,7 @@ using WeakRefStrings # DataTables DataStreams implementation function Data.schema(df::DataTable, ::Type{Data.Column}) return Data.Schema(map(string, names(df)), - DataType[typeof(A) for A in df.columns], size(df, 1)) + DataType[typeof(A) for A in df.columns], size(df, 1)) end # DataTable as a Data.Source @@ -218,12 +218,12 @@ end Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true -Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = ( - @inbounds A = source.columns[col]::T; return A) -Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = ( - @inbounds A = source.columns[col]; return A) -Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = ( - @inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) +Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = + (@inbounds A = source.columns[col]::T; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) = + (@inbounds A = source.columns[col]; return A) +Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) = + (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T) # DataTable as a Data.Sink allocate{T}(::Type{T}, rows, ref) = Array{T}(rows) @@ -268,8 +268,7 @@ function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool, if append for (i, T) in enumerate(Data.types(sch)) if T <: Nullable{WeakRefString{UInt8}} - sink.columns[i] = NullableArray(String[string(get(x, "")) - for x in sink.columns[i]]) + sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]]) sch.types[i] = Nullable{String} end end @@ -286,61 +285,24 @@ end Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field] -Data.streamto!{T}(sink::DataTable, - ::Type{Data.Field}, - val::T, - row, - col, - sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val) -Data.streamto!{T}(sink::DataTable, - ::Type{Data.Field}, - val::Nullable{T}, - row, - col, - sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val) -Data.streamto!{T, R}(sink::DataTable, - ::Type{Data.Field}, - val::CategoricalValue{T, R}, - row, - col, - sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val) -Data.streamto!{T, R}(sink::DataTable, - ::Type{Data.Field}, - val::Nullable{CategoricalValue{T, R}}, - row, - col, - sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) - -Data.streamto!{T}(sink::DataTable, - ::Type{Data.Field}, - val::T, - row, - col, - sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val -Data.streamto!{T}(sink::DataTable, - ::Type{Data.Field}, - val::Nullable{T}, - row, - col, - sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val -Data.streamto!(sink::DataTable, - ::Type{Data.Field}, - val::Nullable{WeakRefString{UInt8}}, - row, - col, - sch::Data.Schema{true}) = sink.columns[col][row] = val -Data.streamto!{T, R}(sink::DataTable, - ::Type{Data.Field}, - val::CategoricalValue{T, R}, - row, - col, - sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val -Data.streamto!{T, R}(sink::DataTable, - ::Type{Data.Field}, - val::Nullable{CategoricalValue{T, R}}, - row, - col, - sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::Vector{T}, val) +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::NullableVector{T}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::CategoricalVector{T, R}, val) +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = + push!(sink.columns[col]::NullableCategoricalVector{T, R}, val) +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::Vector{T})[row] = val +Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::NullableVector{T})[row] = val +Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = + sink.columns[col][row] = val +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::CategoricalVector{T, R})[row] = val +Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = + (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema) if row == 0 From 91026fb0f0d7f3f4648cdb442a41fead44994639 Mon Sep 17 00:00:00 2001 From: quinnj Date: Thu, 6 Apr 2017 23:16:40 -0600 Subject: [PATCH 5/5] Cleanup REQUIRES and add a quick test --- REQUIRE | 4 ++-- test/REQUIRE | 3 +-- test/io.jl | 6 ++++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/REQUIRE b/REQUIRE index 1404274..e7929e1 100644 --- a/REQUIRE +++ b/REQUIRE @@ -5,5 +5,5 @@ StatsBase 0.11.0 SortingAlgorithms Reexport Compat 0.19.0 -WeakRefStrings -DataStreams +WeakRefStrings 0.1.3 +DataStreams 0.1.0 diff --git a/test/REQUIRE b/test/REQUIRE index 644abb1..3f5122d 100644 --- a/test/REQUIRE +++ b/test/REQUIRE @@ -1,5 +1,4 @@ Compat 0.9.0 DataStructures -RData LaTeXStrings -Atom +CSV \ No newline at end of file diff --git a/test/io.jl b/test/io.jl index 336f881..6f042eb 100644 --- a/test/io.jl +++ b/test/io.jl @@ -51,4 +51,10 @@ module TestIO answer = Sys.WORD_SIZE == 64 ? 0x937e94e70d642cce : 0x2b8864d8 @test hash(sprint(printtable, dt)) == answer + + # DataStreams + using CSV + + dt = CSV.read(joinpath(dirname(@__FILE__), "data/iris.csv"), DataTable) + @test size(dt) == (150, 5) end