From 5c88505f80198a8cf513f32bd61c2cbde43a121a Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 23 Apr 2019 22:46:42 -0600 Subject: [PATCH] Remove old DataStreams code + REQUIRE dependency, as well as WeakRefStrings since it was really just a transient dep thru DataStreams --- REQUIRE | 2 - src/abstractdataframe/io.jl | 117 ------------------------------------ test/io.jl | 51 ---------------- 3 files changed, 170 deletions(-) diff --git a/REQUIRE b/REQUIRE index 91bd697ba9..322196b254 100644 --- a/REQUIRE +++ b/REQUIRE @@ -4,8 +4,6 @@ CategoricalArrays 0.5.2 StatsBase 0.11.0 SortingAlgorithms Reexport -WeakRefStrings 0.4.0 -DataStreams 0.3.0 Compat 0.59.0 Tables 0.1.15 IteratorInterfaceExtensions 0.1.1 diff --git a/src/abstractdataframe/io.jl b/src/abstractdataframe/io.jl index b5ef8c943b..24ace37965 100644 --- a/src/abstractdataframe/io.jl +++ b/src/abstractdataframe/io.jl @@ -392,120 +392,3 @@ function Base.show(io::IO, mime::MIME"text/tab-separated-values", gd::GroupedDat end end -############################################################################## -# -# DataStreams-based IO -# -############################################################################## - -using DataStreams, WeakRefStrings - -struct DataFrameStream{T} - columns::T - header::Vector{String} -end -DataFrameStream(df::DataFrame) = DataFrameStream(Tuple(_columns(df)), string.(names(df))) - -# DataFrame Data.Source implementation -Data.schema(df::DataFrame) = - Data.Schema(Type[eltype(A) for A in _columns(df)], string.(names(df)), size(df, 1)) - -Data.isdone(source::DataFrame, row, col, rows, cols) = row > rows || col > cols -function Data.isdone(source::DataFrame, row, col) - cols = length(source) - return Data.isdone(source, row, col, cols == 0 ? 0 : length(source.columns[1]), cols) -end - -Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true -Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true - -Data.streamfrom(source::DataFrame, ::Type{Data.Column}, ::Type{T}, row, col) where {T} = - source[col] -Data.streamfrom(source::DataFrame, ::Type{Data.Field}, ::Type{T}, row, col) where {T} = - source[col][row] - -# DataFrame Data.Sink implementation -Data.streamtypes(::Type{DataFrame}) = [Data.Column, Data.Field] -Data.weakrefstrings(::Type{DataFrame}) = true - -allocate(::Type{T}, rows, ref) where {T} = Vector{T}(undef, rows) -allocate(::Type{CategoricalString{R}}, rows, ref) where {R} = - CategoricalArray{String, 1, R}(undef, rows) -allocate(::Type{Union{CategoricalString{R}, Missing}}, rows, ref) where {R} = - CategoricalArray{Union{String, Missing}, 1, R}(undef, rows) -allocate(::Type{CategoricalValue{T, R}}, rows, ref) where {T, R} = - CategoricalArray{T, 1, R}(undef, rows) -allocate(::Type{Union{Missing, CategoricalValue{T, R}}}, rows, ref) where {T, R} = - CategoricalArray{Union{Missing, T}, 1, R}(undef, rows) -allocate(::Type{WeakRefString{T}}, rows, ref) where {T} = - WeakRefStringArray(ref, WeakRefString{T}, rows) -allocate(::Type{Union{Missing, WeakRefString{T}}}, rows, ref) where {T} = - WeakRefStringArray(ref, Union{Missing, WeakRefString{T}}, rows) -allocate(::Type{Missing}, rows, ref) = missings(rows) - -# Construct or modify a DataFrame to be ready to stream data from a source with `sch` -function DataFrame(sch::Data.Schema{R}, ::Type{S}=Data.Field, - append::Bool=false, args...; - reference::Vector{UInt8}=UInt8[]) where {R, S <: Data.StreamType} - types = Data.types(sch) - if !isempty(args) && args[1] isa DataFrame && types == Data.types(Data.schema(args[1])) - # passing in an existing DataFrame Sink w/ same types as source - sink = args[1] - sinkrows = size(Data.schema(sink), 1) - # are we appending and either column-streaming or there are an unknown # of rows - if append && (S == Data.Column || !R) - sch.rows = sinkrows - # dont' need to do anything because: - # for Data.Column, we just append columns anyway (see Data.streamto! below) - # for Data.Field, unknown # of source rows, so we'll just push! in streamto! - else - # need to adjust the existing sink - # similar to above, for Data.Column or unknown # of rows for Data.Field, - # we'll append!/push! in streamto!, so we empty! the columns - # if appending, we want to grow our columns to be able to include every row - # in source (sinkrows + sch.rows) - # if not appending, we're just "re-using" a sink, so we just resize it - # to the # of rows in the source - newsize = ifelse(S == Data.Column || !R, 0, - ifelse(append, sinkrows + sch.rows, sch.rows)) - foreach(col->resize!(col, newsize), _columns(sink)) - sch.rows = newsize - end - # take care of a possible reference from source by addint to WeakRefStringArrays - if !isempty(reference) - foreach(col-> col isa WeakRefStringArray && push!(col.data, reference), - _columns(sink)) - end - return DataFrameStream(sink) - else - # allocating a fresh DataFrame Sink; append is irrelevant - # for Data.Column or unknown # of rows in Data.Field, we only ever append!, - # so just allocate empty columns - rows = ifelse(S == Data.Column, 0, ifelse(!R, 0, sch.rows)) - names = Data.header(sch) - sch.rows = rows - return DataFrameStream(Tuple(allocate(types[i], rows, reference) - for i = 1:length(types)), names) - end -end - -DataFrame(sink, sch::Data.Schema, ::Type{S}, append::Bool; - reference::Vector{UInt8}=UInt8[]) where {S} = - DataFrame(sch, S, append, sink; reference=reference) - -@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val, - row, col::Int) = - (A = sink.columns[col]; row > length(A) ? push!(A, val) : setindex!(A, val, row)) -@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val, - row, col::Int, ::Type{Val{false}}) = - push!(sink.columns[col], val) -@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val, - row, col::Int, ::Type{Val{true}}) = - sink.columns[col][row] = val -@inline function Data.streamto!(sink::DataFrameStream, ::Type{Data.Column}, column, - row, col::Int, knownrows) - append!(sink.columns[col], column) -end - -Data.close!(df::DataFrameStream) = - DataFrame(collect(AbstractVector, df.columns), Symbol.(df.header), makeunique=true) diff --git a/test/io.jl b/test/io.jl index cbc266dfcc..828274ef95 100644 --- a/test/io.jl +++ b/test/io.jl @@ -103,57 +103,6 @@ end """ end -@testset "DataStreams" begin - using DataStreams - I = DataFrame(id = Int64[1, 2, 3, 4, 5], - firstname = Union{String, Missing}["Benjamin", "Wayne", "Sean", "Charles", missing], - lastname = String["Chavez", "Burke", "Richards", "Long", "Rose"], - salary = Union{Float64, Missing}[missing, 46134.1, 45046.2, 30555.6, 88894.1], - rate = Float64[39.44, 33.8, 15.64, 17.67, 34.6], - hired = Union{Date, Missing}[Date("2011-07-07"), Date("2016-02-19"), missing, - Date("2002-01-05"), Date("2008-05-15")], - fired = DateTime[DateTime("2016-04-07T14:07:00"), DateTime("2015-03-19T15:01:00"), - DateTime("2006-11-18T05:07:00"), DateTime("2002-07-18T06:24:00"), - DateTime("2007-09-29T12:09:00")], - reserved = missings(5) - ) - sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I))) - sch = DataStreams.Data.schema(sink) - @test size(sch) == (5, 8) - @test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary", - "rate", "hired", "fired", "reserved"] - @test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String, - Union{Float64, Missing}, Float64, - Union{Date, Missing}, DateTime, Missing) - @test sink[:id] == [1:5;] - - transforms = Dict(1=>x->x+1) - sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I); - append=true, transforms=transforms)) - sch = DataStreams.Data.schema(sink) - @test size(sch) == (10, 8) - @test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary", - "rate", "hired", "fired", "reserved"] - @test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String, - Union{Float64, Missing}, Float64, - Union{Date, Missing}, DateTime, Missing) - @test sink[:id] == [1:5; 2:6] - - sink = DataStreams.Data.close!(Data.stream!(I, DataFrame, deepcopy(I))) - sch = DataStreams.Data.schema(sink) - @test size(sch) == (5, 8) - @test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary", - "rate", "hired", "fired", "reserved"] - @test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String, - Union{Float64, Missing}, Float64, - Union{Date, Missing}, DateTime, Missing) - @test sink[:id] == [1:5;] - - # test DataFrameStream creation - dfs = DataFrame(sch) - DataStreams.Data.close!(dfs) -end - @testset "csv/tsv output" begin df = DataFrame(a = [1,2], b = [1.0, 2.0]) @test sprint(show, "text/csv", df) == """