Skip to content
This repository has been archived by the owner on May 5, 2019. It is now read-only.

Commit

Permalink
Merge branch 'cjprybol-cjp/CSV'
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Apr 7, 2017
2 parents 35e8c95 + 91026fb commit 42ebbf5
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 3 deletions.
2 changes: 2 additions & 0 deletions REQUIRE
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ StatsBase 0.11.0
SortingAlgorithms
Reexport
Compat 0.19.0
WeakRefStrings 0.1.3
DataStreams 0.1.0
1 change: 0 additions & 1 deletion src/DataTables.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import NullableArrays: dropnull, dropnull!
@reexport using CategoricalArrays
using GZip
using SortingAlgorithms

using Base: Sort, Order
import Base: ==, |>

Expand Down
137 changes: 137 additions & 0 deletions src/abstractdatatable/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,140 @@ end
@compat function Base.show(io::IO, ::MIME"text/tab-separated-values", dt::AbstractDataTable)
printtable(io, dt, true, '\t')
end

##############################################################################
#
# DataStreams-based IO
#
##############################################################################

importall DataStreams
using WeakRefStrings

# DataTables DataStreams implementation
function Data.schema(df::DataTable, ::Type{Data.Column})
return Data.Schema(map(string, names(df)),
DataType[typeof(A) for A in df.columns], size(df, 1))
end

# DataTable as a Data.Source
function Data.isdone(source::DataTable, row, col)
rows, cols = size(source)
return row > rows || col > cols
end

Data.streamtype(::Type{DataTable}, ::Type{Data.Column}) = true
Data.streamtype(::Type{DataTable}, ::Type{Data.Field}) = true

Data.streamfrom{T <: AbstractVector}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) =
(@inbounds A = source.columns[col]::T; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Column}, ::Type{T}, col) =
(@inbounds A = source.columns[col]; return A)
Data.streamfrom{T}(source::DataTable, ::Type{Data.Field}, ::Type{T}, row, col) =
(@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T)

# DataTable as a Data.Sink
allocate{T}(::Type{T}, rows, ref) = Array{T}(rows)
allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows)

allocate{T}(::Type{Nullable{T}}, rows, ref) =
NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)
allocate{T}(::Type{NullableVector{T}}, rows, ref) =
NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)

allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) =
CategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) =
CategoricalArray{S,1,R}(rows)

allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) =
NullableCategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) =
NullableCategoricalArray{S,1,R}(rows)

function DataTable{T <: Data.StreamType}(sch::Data.Schema,
::Type{T}=Data.Field,
append::Bool=false,
ref::Vector{UInt8}=UInt8[], args...)
rows, cols = size(sch)
rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming
columns = Vector{Any}(cols)
types = Data.types(sch)
for i = 1:cols
columns[i] = allocate(types[i], rows, ref)
end
return DataTable(columns, map(Symbol, Data.header(sch)))
end

# given an existing DataTable (`sink`), make any necessary changes for streaming source
# with Data.Schema `sch` to it, given we know if we'll be `appending` or not
function DataTable(sink, sch::Data.Schema, ::Type{Data.Field}, append::Bool,
ref::Vector{UInt8})
rows, cols = size(sch)
newsize = max(0, rows) + (append ? size(sink, 1) : 0)
# need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending
if append
for (i, T) in enumerate(Data.types(sch))
if T <: Nullable{WeakRefString{UInt8}}
sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]])
sch.types[i] = Nullable{String}
end
end
end
newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns)
sch.rows = newsize
return sink
end
function DataTable(sink, sch::Data.Schema, ::Type{Data.Column}, append::Bool, ref::Vector{UInt8})
rows, cols = size(sch)
append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns)
return sink
end

Data.streamtypes(::Type{DataTable}) = [Data.Column, Data.Field]

Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::Vector{T}, val)
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::NullableVector{T}, val)
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::CategoricalVector{T, R}, val)
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) =
push!(sink.columns[col]::NullableCategoricalVector{T, R}, val)
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::Vector{T})[row] = val
Data.streamto!{T}(sink::DataTable, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::NullableVector{T})[row] = val
Data.streamto!(sink::DataTable, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) =
sink.columns[col][row] = val
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::CategoricalVector{T, R})[row] = val
Data.streamto!{T, R}(sink::DataTable, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) =
(sink.columns[col]::NullableCategoricalVector{T, R})[row] = val

function Data.streamto!{T}(sink::DataTable, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema)
if row == 0
sink.columns[col] = column
else
append!(sink.columns[col]::T, column)
end
return length(column)
end

function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}})
offset = length(dest.values)
parentoffset = length(dest.parent)
append!(dest.isnull, column.isnull)
append!(dest.parent, column.parent)
# appending new data to `dest` would invalid all existing WeakRefString pointers
resize!(dest.values, length(dest) + length(column))
for i = 1:offset
old = dest.values[i]
dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind)
end
for i = 1:length(column)
old = column.values[i]
dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind)
end
return length(dest)
end
3 changes: 1 addition & 2 deletions test/REQUIRE
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Compat 0.9.0
DataStructures
RData
LaTeXStrings
Atom
CSV
6 changes: 6 additions & 0 deletions test/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ module TestIO

answer = Sys.WORD_SIZE == 64 ? 0x937e94e70d642cce : 0x2b8864d8
@test hash(sprint(printtable, dt)) == answer

# DataStreams
using CSV

dt = CSV.read(joinpath(dirname(@__FILE__), "data/iris.csv"), DataTable)
@test size(dt) == (150, 5)
end

0 comments on commit 42ebbf5

Please sign in to comment.