Skip to content
This repository has been archived by the owner on May 5, 2019. It is now read-only.

Commit

Permalink
Merge 96766da into 892cc6a
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed May 20, 2017
2 parents 892cc6a + 96766da commit 8f25524
Show file tree
Hide file tree
Showing 30 changed files with 536 additions and 722 deletions.
1 change: 0 additions & 1 deletion .travis.yml
@@ -1,7 +1,6 @@

language: julia
julia:
- 0.5
- 0.6
os:
- linux
Expand Down
3 changes: 1 addition & 2 deletions REQUIRE
@@ -1,5 +1,4 @@
julia 0.5
NullableArrays 0.1.0
julia 0.6-
CategoricalArrays 0.1.2
StatsBase 0.11.0
SortingAlgorithms
Expand Down
2 changes: 0 additions & 2 deletions appveyor.yml
@@ -1,7 +1,5 @@
environment:
matrix:
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.5/julia-0.5-latest-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.5/julia-0.5-latest-win64.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.6/julia-0.6-latest-win64.exe"

Expand Down
10 changes: 3 additions & 7 deletions src/DataTables.jl
Expand Up @@ -8,14 +8,10 @@ module DataTables
##
##############################################################################

using Compat
import Compat.String
using Reexport
using StatsBase
import NullableArrays: dropnull, dropnull!
@reexport using NullableArrays
using Compat, Reexport
using StatsBase, SortingAlgorithms, Nulls
@reexport using CategoricalArrays
using SortingAlgorithms

using Base: Sort, Order
import Base: ==, |>

Expand Down
56 changes: 12 additions & 44 deletions src/abstractdatatable/abstractdatatable.jl
Expand Up @@ -227,7 +227,7 @@ Base.ndims(::AbstractDataTable) = 2
##############################################################################

Base.similar(dt::AbstractDataTable, dims::Int) =
DataTable(Any[similar(x, dims) for x in columns(dt)], copy(index(dt)))
DataTable(Any[similar_nullable(x, dims) for x in columns(dt)], copy(index(dt)))

##############################################################################
##
Expand Down Expand Up @@ -380,7 +380,7 @@ function StatsBase.describe(io, dt::AbstractDataTable)
end
StatsBase.describe(nv::AbstractArray) = describe(STDOUT, nv)
function StatsBase.describe{T<:Number}(io, nv::AbstractArray{T})
if all(_isnull, nv)
if all(isnull, nv)
println(io, " * All null * ")
return
end
Expand Down Expand Up @@ -416,13 +416,7 @@ end

function _nonnull!(res, col)
for (i, el) in enumerate(col)
res[i] &= !_isnull(el)
end
end

function _nonnull!(res, col::NullableArray)
for (i, el) in enumerate(col.isnull)
res[i] &= !el
res[i] &= !isnull(el)
end
end

Expand Down Expand Up @@ -531,7 +525,6 @@ function Base.convert(::Type{Array}, dt::AbstractDataTable)
end
function Base.convert(::Type{Matrix}, dt::AbstractDataTable)
T = reduce(promote_type, eltypes(dt))
T <: Nullable && (T = eltype(T))
convert(Matrix{T}, dt)
end
function Base.convert{T}(::Type{Array{T}}, dt::AbstractDataTable)
Expand All @@ -542,35 +535,13 @@ function Base.convert{T}(::Type{Matrix{T}}, dt::AbstractDataTable)
res = Matrix{T}(n, p)
idx = 1
for (name, col) in zip(names(dt), columns(dt))
any(isnull, col) && error("cannot convert a DataTable containing null values to array (found for column $name)")
!(T >: Null) && any(isnull, col) && error("cannot convert a DataTable containing null values to array (found for column $name)")
copy!(res, idx, convert(Vector{T}, col))
idx += n
end
return res
end

function Base.convert(::Type{NullableArray}, dt::AbstractDataTable)
convert(NullableMatrix, dt)
end
function Base.convert(::Type{NullableMatrix}, dt::AbstractDataTable)
T = reduce(promote_type, eltypes(dt))
T <: Nullable && (T = eltype(T))
convert(NullableMatrix{T}, dt)
end
function Base.convert{T}(::Type{NullableArray{T}}, dt::AbstractDataTable)
convert(NullableMatrix{T}, dt)
end
function Base.convert{T}(::Type{NullableMatrix{T}}, dt::AbstractDataTable)
n, p = size(dt)
res = NullableArray(T, n, p)
idx = 1
for col in columns(dt)
copy!(res, idx, col)
idx += n
end
return res
end

"""
Indexes of duplicate rows (a row that is a duplicate of a prior row)
Expand Down Expand Up @@ -696,24 +667,21 @@ Base.hcat(dt::AbstractDataTable, x, y...) = hcat!(hcat(dt, x), y...)
Base.hcat(dt1::AbstractDataTable, dt2::AbstractDataTable, dtn::AbstractDataTable...) = hcat!(hcat(dt1, dt2), dtn...)

@generated function promote_col_type(cols::AbstractVector...)
elty = Base.promote_eltype(cols...)
if elty <: Nullable
elty = eltype(elty)
end
if elty <: CategoricalValue
elty = elty.parameters[1]
T = promote_type(map(x-> eltype(x) >: Null ? Nulls.T(eltype(x)) : eltype(x), cols)...)
if T <: CategoricalValue
T = T.parameters[1]
end
if any(col -> eltype(col) <: Nullable, cols)
if any(col -> Null <: eltype(col), cols)
if any(col -> col <: Union{AbstractCategoricalArray, AbstractNullableCategoricalArray}, cols)
return :(NullableCategoricalVector{$elty})
return :(NullableCategoricalVector{$T})
else
return :(NullableVector{$elty})
return :(Vector{$T})
end
else
if any(col -> col <: Union{AbstractCategoricalArray, AbstractNullableCategoricalArray}, cols)
return :(CategoricalVector{$elty})
return :(CategoricalVector{$T})
else
return :(Vector{$elty})
return :(Vector{$T})
end
end
end
Expand Down
108 changes: 25 additions & 83 deletions src/abstractdatatable/io.jl
Expand Up @@ -205,9 +205,9 @@ importall DataStreams
using WeakRefStrings

# DataTables DataStreams implementation
function Data.schema(df::DataTable, ::Type{Data.Column})
function Data.schema(df::DataTable, ::Type{Data.Batch})
return Data.Schema(map(string, names(df)),
DataType[typeof(A) for A in df.columns], size(df, 1))
Type[typeof(A) for A in df.columns], size(df, 1))
end

# DataTable as a Data.Source
Expand All @@ -216,118 +216,60 @@ function Data.isdone(source::DataTable, row, col)
return row > rows || col > cols
end

Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true
Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true
Data.streamtype(::Type{DataTable}, ::Type{Data.Batch}) = true
Data.streamtype(::Type{DataTable}, ::Type{Data.Row}) = true

Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) =
(@inbounds A = source.columns[col]::T; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) =
(@inbounds A = source.columns[col]; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) =
(@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T)
# Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Batch}, ::Type{T}, col) =
# (A = source.columns[col]::T; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Batch}, ::Type{T}, col) =
(A = source.columns[col]::AbstractVector{T}; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Row}, ::Type{T}, row, col) =
(A = source.columns[col]::AbstractVector{T}; return A[row]::T)

# DataTable as a Data.Sink
allocate{T}(::Type{T}, rows, ref) = Array{T}(rows)
allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows)

allocate{T}(::Type{Nullable{T}}, rows, ref) =
NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)
allocate{T}(::Type{NullableVector{T}}, rows, ref) =
NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)

allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) =
CategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) =
CategoricalArray{S,1,R}(rows)

allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) =
NullableCategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) =
NullableCategoricalArray{S,1,R}(rows)
allocate{T}(::Type{T}, rows) = Vector{T}(rows)
allocate{T}(::Type{Vector{T}}, rows) = Vector{T}(rows)

function DataTable{T <: Data.StreamType}(sch::Data.Schema,
::Type{T}=Data.Field,
append::Bool=false,
ref::Vector{UInt8}=UInt8[], args...)
::Type{T}=Data.Row,
append::Bool=false)
rows, cols = size(sch)
rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming
rows = max(0, T <: Data.Batch ? 0 : rows) # don't pre-allocate for Column streaming
columns = Vector{Any}(cols)
types = Data.types(sch)
for i = 1:cols
columns[i] = allocate(types[i], rows, ref)
columns[i] = allocate(types[i], rows)
end
return DataTable(columns, map(Symbol, Data.header(sch)))
end

# given an existing DataTable (`sink`), make any necessary changes for streaming source
# with Data.Schema `sch` to it, given we know if we'll be `appending` or not
function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool,
ref::Vector{UInt8})
function DataTable(sink, sch::Data.Schema, ::Type{Data.Row}, append::Bool)
rows, cols = size(sch)
newsize = max(0, rows) + (append ? size(sink, 1) : 0)
# need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending
if append
for (i, T) in enumerate(Data.types(sch))
if T <: Nullable{WeakRefString{UInt8}}
sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]])
sch.types[i] = Nullable{String}
end
end
end
newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns)
sch.rows = newsize
return sink
end
function DataTable(sink, sch::Data.Schema, ::Type{Data.Column}, append::Bool, ref::Vector{UInt8})
function DataTable(sink, sch::Data.Schema, ::Type{Data.Batch}, append::Bool)
rows, cols = size(sch)
append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns)
return sink
end

Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field]
Data.streamtypes(::Type{DataTable}) = [Data.Batch, Data.Row]

Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::Vector{T}, val)
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::NullableVector{T}, val)
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::CategoricalVector{T, R}, val)
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::NullableCategoricalVector{T, R}, val)
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::Vector{T})[row] = val
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::NullableVector{T})[row] = val
Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) =
sink.columns[col][row] = val
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::CategoricalVector{T, R})[row] = val
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::NullableCategoricalVector{T, R})[row] = val
Data.streamto!{T}(sink::DataTable, ::Type{Data.Row}, val::T, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col], val)
Data.streamto!{T}(sink::DataTable, ::Type{Data.Row}, val::T, row, col, sch::Data.Schema{true}) =
(sink.columns[col])[row] = val

function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema)
function Data.streamto!{T}(sink::DataTable, ::Type{Data.Batch}, column::T, row, col, sch::Data.Schema)
if row == 0
sink.columns[col] = column
else
append!(sink.columns[col]::T, column)
end
return length(column)
end

function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}})
offset = length(dest.values)
parentoffset = length(dest.parent)
append!(dest.isnull, column.isnull)
append!(dest.parent, column.parent)
# appending new data to `dest` would invalid all existing WeakRefString pointers
resize!(dest.values, length(dest) + length(column))
for i = 1:offset
old = dest.values[i]
dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind)
end
for i = 1:length(column)
old = column.values[i]
dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind)
end
return length(dest)
end
end
12 changes: 6 additions & 6 deletions src/abstractdatatable/join.jl
Expand Up @@ -2,12 +2,12 @@
## Join / merge
##

# Like similar, but returns a nullable array
# Like similar, but returns a array that can have nulls
similar_nullable{T}(dv::AbstractArray{T}, dims::Union{Int, Tuple{Vararg{Int}}}) =
NullableArray{T}(dims)
(v = Vector{?T}(dims); fill!(v, null); return v)

similar_nullable{T<:Nullable}(dv::AbstractArray{T}, dims::Union{Int, Tuple{Vararg{Int}}}) =
NullableArray{eltype(T)}(dims)
similar_nullable{T <: Union}(dv::AbstractArray{T}, dims::Union{Int, Tuple{Vararg{Int}}}) =
(v = Vector{?Nulls.T(T)}(dims); fill!(v, null); return v)

similar_nullable{T}(dv::CategoricalArray{T}, dims::Union{Int, Tuple{Vararg{Int}}}) =
NullableCategoricalArray{T}(dims)
Expand All @@ -23,9 +23,9 @@ immutable DataTableJoiner{DT1<:AbstractDataTable, DT2<:AbstractDataTable}
dtr_on::DT2
on_cols::Vector{Symbol}

function DataTableJoiner(dtl::DT1, dtr::DT2, on::Union{Symbol,Vector{Symbol}})
function DataTableJoiner{DT1, DT2}(dtl::DT1, dtr::DT2, on::Union{Symbol,Vector{Symbol}})
on_cols = isa(on, Symbol) ? [on] : on
new(dtl, dtr, dtl[on_cols], dtr[on_cols], on_cols)
new{DT1, DT2}(dtl, dtr, dtl[on_cols], dtr[on_cols], on_cols)
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/abstractdatatable/reshape.jl
Expand Up @@ -108,7 +108,7 @@ function stack(dt::AbstractDataTable, measure_vars, id_vars;
end
# no vars specified, by default select only numeric columns
numeric_vars(dt::AbstractDataTable) =
[T <: AbstractFloat || (T <: Nullable && eltype(T) <: AbstractFloat)
[T <: AbstractFloat || (T >: Null && Nulls.T(T) <: AbstractFloat)
for T in eltypes(dt)]

function stack(dt::AbstractDataTable, measure_vars = numeric_vars(dt);
Expand Down
2 changes: 1 addition & 1 deletion src/abstractdatatable/show.jl
Expand Up @@ -64,7 +64,7 @@ end
ourshowcompact(io::IO, x::Any) = showcompact(io, x) # -> Void
ourshowcompact(io::IO, x::AbstractString) = print(io, x) # -> Void
ourshowcompact(io::IO, x::Symbol) = print(io, x) # -> Void
ourshowcompact(io::IO, x::Nullable{String}) = isnull(x) ? showcompact(io, x) : print(io, get(x)) # -> Void
ourshowcompact(io::IO, x::(?String)) = isnull(x) ? showcompact(io, x) : print(io, x) # -> Void

#' @description
#'
Expand Down

0 comments on commit 8f25524

Please sign in to comment.