Skip to content

Commit

Permalink
Merge pull request #1239 from alyst/allocate_nulls
Browse files Browse the repository at this point in the history
Add allocate(::Type{Null})
  • Loading branch information
quinnj committed Sep 21, 2017
2 parents 75ad2fe + 7307a89 commit f0422b3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/abstractdataframe/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ allocate(::Type{Union{Null, CategoricalValue{T, R}}}, rows, ref) where {T, R} =
CategoricalArray{Union{Null, T}, 1, R}(rows)
allocate(::Type{T}, rows, ref) where {T <: Union{WeakRefString, Null}} =
WeakRefStringArray(ref, T, rows)
allocate(::Type{Null}, rows, ref) = nulls(rows)

# Construct or modify a DataFrame to be ready to stream data from a source with `sch`
function DataFrame(sch::Data.Schema{R}, ::Type{S}=Data.Field,
Expand Down
25 changes: 15 additions & 10 deletions test/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,32 @@ module TestIO
salary = Union{Float64, Null}[null, 46134.1, 45046.2, 30555.6, 88894.1],
rate = Float64[39.44, 33.8, 15.64, 17.67, 34.6],
hired = Union{Date, Null}[Date("2011-07-07"), Date("2016-02-19"), null, Date("2002-01-05"), Date("2008-05-15")],
fired = DateTime[DateTime("2016-04-07T14:07:00"), DateTime("2015-03-19T15:01:00"), DateTime("2006-11-18T05:07:00"), DateTime("2002-07-18T06:24:00"), DateTime("2007-09-29T12:09:00")]
fired = DateTime[DateTime("2016-04-07T14:07:00"), DateTime("2015-03-19T15:01:00"), DateTime("2006-11-18T05:07:00"), DateTime("2002-07-18T06:24:00"), DateTime("2007-09-29T12:09:00")],
reserved = nulls(5)
)
sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I)))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (5, 7)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime)
@test size(sch) == (5, 8)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired","reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime, Null)
@test sink[:id] == [1,2,3,4,5]

transforms = Dict(1=>x->x+1)
sink = DataStreams.Data.close!(DataStreams.Data.stream!(I, deepcopy(I); append=true, transforms=transforms))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (10, 7)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime)
@test size(sch) == (10, 8)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired","reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime, Null)
@test sink[:id] == [1,2,3,4,5,2,3,4,5,6]

sink = DataStreams.Data.close!(Data.stream!(I, DataFrame, deepcopy(I)))
sch = DataStreams.Data.schema(sink)
@test size(sch) == (5, 7)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime)
@test size(sch) == (5, 8)
@test DataStreams.Data.header(sch) == ["id","firstname","lastname","salary","rate","hired","fired","reserved"]
@test DataStreams.Data.types(sch) == (Int64, Union{String, Null}, String, Union{Float64, Null}, Float64, Union{Date, Null}, DateTime, Null)
@test sink[:id] == [1,2,3,4,5]

# test DataFrameStream creation
dfs = DataFrame(sch)
DataStreams.Data.close!(dfs)
end

0 comments on commit f0422b3

Please sign in to comment.