Skip to content

Commit

Permalink
Remove old DataStreams code + REQUIRE dependency, as well as WeakRefS…
Browse files Browse the repository at this point in the history
…trings since it was really just a transient dep thru DataStreams
  • Loading branch information
quinnj committed Apr 24, 2019
1 parent 395754c commit 5c88505
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 170 deletions.
2 changes: 0 additions & 2 deletions REQUIRE
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ CategoricalArrays 0.5.2
StatsBase 0.11.0
SortingAlgorithms
Reexport
WeakRefStrings 0.4.0
DataStreams 0.3.0
Compat 0.59.0
Tables 0.1.15
IteratorInterfaceExtensions 0.1.1
Expand Down
117 changes: 0 additions & 117 deletions src/abstractdataframe/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -392,120 +392,3 @@ function Base.show(io::IO, mime::MIME"text/tab-separated-values", gd::GroupedDat
end
end

##############################################################################
#
# DataStreams-based IO
#
##############################################################################

using DataStreams, WeakRefStrings

struct DataFrameStream{T}
columns::T
header::Vector{String}
end
DataFrameStream(df::DataFrame) = DataFrameStream(Tuple(_columns(df)), string.(names(df)))

# DataFrame Data.Source implementation
Data.schema(df::DataFrame) =
Data.Schema(Type[eltype(A) for A in _columns(df)], string.(names(df)), size(df, 1))

Data.isdone(source::DataFrame, row, col, rows, cols) = row > rows || col > cols
function Data.isdone(source::DataFrame, row, col)
cols = length(source)
return Data.isdone(source, row, col, cols == 0 ? 0 : length(source.columns[1]), cols)
end

Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true
Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true

Data.streamfrom(source::DataFrame, ::Type{Data.Column}, ::Type{T}, row, col) where {T} =
source[col]
Data.streamfrom(source::DataFrame, ::Type{Data.Field}, ::Type{T}, row, col) where {T} =
source[col][row]

# DataFrame Data.Sink implementation
Data.streamtypes(::Type{DataFrame}) = [Data.Column, Data.Field]
Data.weakrefstrings(::Type{DataFrame}) = true

allocate(::Type{T}, rows, ref) where {T} = Vector{T}(undef, rows)
allocate(::Type{CategoricalString{R}}, rows, ref) where {R} =
CategoricalArray{String, 1, R}(undef, rows)
allocate(::Type{Union{CategoricalString{R}, Missing}}, rows, ref) where {R} =
CategoricalArray{Union{String, Missing}, 1, R}(undef, rows)
allocate(::Type{CategoricalValue{T, R}}, rows, ref) where {T, R} =
CategoricalArray{T, 1, R}(undef, rows)
allocate(::Type{Union{Missing, CategoricalValue{T, R}}}, rows, ref) where {T, R} =
CategoricalArray{Union{Missing, T}, 1, R}(undef, rows)
allocate(::Type{WeakRefString{T}}, rows, ref) where {T} =
WeakRefStringArray(ref, WeakRefString{T}, rows)
allocate(::Type{Union{Missing, WeakRefString{T}}}, rows, ref) where {T} =
WeakRefStringArray(ref, Union{Missing, WeakRefString{T}}, rows)
allocate(::Type{Missing}, rows, ref) = missings(rows)

# Construct or modify a DataFrame to be ready to stream data from a source with `sch`
function DataFrame(sch::Data.Schema{R}, ::Type{S}=Data.Field,
append::Bool=false, args...;
reference::Vector{UInt8}=UInt8[]) where {R, S <: Data.StreamType}
types = Data.types(sch)
if !isempty(args) && args[1] isa DataFrame && types == Data.types(Data.schema(args[1]))
# passing in an existing DataFrame Sink w/ same types as source
sink = args[1]
sinkrows = size(Data.schema(sink), 1)
# are we appending and either column-streaming or there are an unknown # of rows
if append && (S == Data.Column || !R)
sch.rows = sinkrows
# dont' need to do anything because:
# for Data.Column, we just append columns anyway (see Data.streamto! below)
# for Data.Field, unknown # of source rows, so we'll just push! in streamto!
else
# need to adjust the existing sink
# similar to above, for Data.Column or unknown # of rows for Data.Field,
# we'll append!/push! in streamto!, so we empty! the columns
# if appending, we want to grow our columns to be able to include every row
# in source (sinkrows + sch.rows)
# if not appending, we're just "re-using" a sink, so we just resize it
# to the # of rows in the source
newsize = ifelse(S == Data.Column || !R, 0,
ifelse(append, sinkrows + sch.rows, sch.rows))
foreach(col->resize!(col, newsize), _columns(sink))
sch.rows = newsize
end
# take care of a possible reference from source by addint to WeakRefStringArrays
if !isempty(reference)
foreach(col-> col isa WeakRefStringArray && push!(col.data, reference),
_columns(sink))
end
return DataFrameStream(sink)
else
# allocating a fresh DataFrame Sink; append is irrelevant
# for Data.Column or unknown # of rows in Data.Field, we only ever append!,
# so just allocate empty columns
rows = ifelse(S == Data.Column, 0, ifelse(!R, 0, sch.rows))
names = Data.header(sch)
sch.rows = rows
return DataFrameStream(Tuple(allocate(types[i], rows, reference)
for i = 1:length(types)), names)
end
end

DataFrame(sink, sch::Data.Schema, ::Type{S}, append::Bool;
reference::Vector{UInt8}=UInt8[]) where {S} =
DataFrame(sch, S, append, sink; reference=reference)

@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val,
row, col::Int) =
(A = sink.columns[col]; row > length(A) ? push!(A, val) : setindex!(A, val, row))
@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val,
row, col::Int, ::Type{Val{false}}) =
push!(sink.columns[col], val)
@inline Data.streamto!(sink::DataFrameStream, ::Type{Data.Field}, val,
row, col::Int, ::Type{Val{true}}) =
sink.columns[col][row] = val
@inline function Data.streamto!(sink::DataFrameStream, ::Type{Data.Column}, column,
row, col::Int, knownrows)
append!(sink.columns[col], column)
end

Data.close!(df::DataFrameStream) =
DataFrame(collect(AbstractVector, df.columns), Symbol.(df.header), makeunique=true)
51 changes: 0 additions & 51 deletions test/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,57 +103,6 @@ end
"""
end

@testset "DataStreams" begin
using DataStreams
I = DataFrame(id = Int64[1, 2, 3, 4, 5],
firstname = Union{String, Missing}["Benjamin", "Wayne", "Sean", "Charles", missing],
lastname = String["Chavez", "Burke", "Richards", "Long", "Rose"],
salary = Union{Float64, Missing}[missing, 46134.1, 45046.2, 30555.6, 88894.1],
rate = Float64[39.44, 33.8, 15.64, 17.67, 34.6],
hired = Union{Date, Missing}[Date("2011-07-07"), Date("2016-02-19"), missing,
Date("2002-01-05"), Date("2008-05-15")],
fired = DateTime[DateTime("2016-04-07T14:07:00"), DateTime("2015-03-19T15:01:00"),
DateTime("2006-11-18T05:07:00"), DateTime("2002-07-18T06:24:00"),
DateTime("2007-09-29T12:09:00")],
reserved = missings(5)
)
sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I)))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (5, 8)
@test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary",
"rate", "hired", "fired", "reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String,
Union{Float64, Missing}, Float64,
Union{Date, Missing}, DateTime, Missing)
@test sink[:id] == [1:5;]

transforms = Dict(1=>x->x+1)
sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I);
append=true, transforms=transforms))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (10, 8)
@test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary",
"rate", "hired", "fired", "reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String,
Union{Float64, Missing}, Float64,
Union{Date, Missing}, DateTime, Missing)
@test sink[:id] == [1:5; 2:6]

sink = DataStreams.Data.close!(Data.stream!(I, DataFrame, deepcopy(I)))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (5, 8)
@test DataStreams.Data.header(sch) == ["id", "firstname", "lastname", "salary",
"rate", "hired", "fired", "reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Missing}, String,
Union{Float64, Missing}, Float64,
Union{Date, Missing}, DateTime, Missing)
@test sink[:id] == [1:5;]

# test DataFrameStream creation
dfs = DataFrame(sch)
DataStreams.Data.close!(dfs)
end

@testset "csv/tsv output" begin
df = DataFrame(a = [1,2], b = [1.0, 2.0])
@test sprint(show, "text/csv", df) == """
Expand Down

0 comments on commit 5c88505

Please sign in to comment.