Skip to content

Commit

Permalink
Use DataAPI.refpool for optimized grouping
Browse files Browse the repository at this point in the history
Generalize existing optimized `row_group_slots` method for `CategoricalArray`
and `PooledArray` so that it can be used for other array types
for which `DataAPI.refpool` returns an `AbstractVector`. This allows dropping
the dependency on CategoricalArrays in this part of the code.

Also refactor the method to be faster when not sorting. In that case, we do
not need to build a map between reference codes and groups (indexing into it
is slow when the number of groups is very large). `CategoricalArray` is no longer
special cased: when `sort=false`, levels are still sorted, but `missing` appears first.
  • Loading branch information
nalimilan committed Sep 19, 2020
1 parent 35eda08 commit bf44745
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ test = ["DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"]

[compat]
julia = "1"
CategoricalArrays = "0.8"
CategoricalArrays = "0.8.3"
Compat = "2.2, 3"
DataAPI = "1.2"
InvertedIndices = "1"
Expand Down
138 changes: 97 additions & 41 deletions src/dataframerow/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,20 @@ isequal_row(cols1::Tuple{Vararg{AbstractVector}}, r1::Int,
# 4) whether groups are already sorted
# Optional `groups` vector is set to the group indices of each row (starting at 1)
# With skipmissing=true, rows with missing values are attributed index 0.
row_group_slots(cols::Tuple{Vararg{AbstractVector}},
hash::Val = Val(true),
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false,
sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} =
row_group_slots(cols, DataAPI.refpool.(cols), hash, groups, skipmissing, sort)

# Generic fallback method based on open adressing hash table
function row_group_slots(cols::Tuple{Vararg{AbstractVector}},
refpools::Any,
hash::Val = Val(true),
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool}
skipmissing::Bool = false,
sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool}
@assert groups === nothing || length(groups) == length(cols[1])
rhashes, missings = hashrows(cols, skipmissing)
# inspired by Dict code from base cf. https://github.com/JuliaData/DataTables.jl/pull/17#discussion_r102481481
Expand Down Expand Up @@ -140,21 +150,29 @@ function row_group_slots(cols::Tuple{Vararg{AbstractVector}},
return ngroups, rhashes, gslots, false
end

nlevels(x::PooledArray) = length(x.pool)
nlevels(x) = length(levels(x))

function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}},
# Optimized method for arrays for which DataAPI.refpool is defined and returns an AbstractVector
function row_group_slots(cols::NTuple{N,<:AbstractVector},
refpools::NTuple{N,<:AbstractVector},
hash::Val{false},
groups::Union{Vector{Int}, Nothing} = nothing,
skipmissing::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N
skipmissing::Bool = false,
sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N
# Computing neither hashes nor groups isn't very useful,
# and this method needs to allocate a groups vector anyway
@assert groups !== nothing && all(col -> length(col) == length(groups), cols)

refpools = map(DataAPI.refpool, cols)
foreach(refpool -> @assert(allunique(refpool)), refpools)

# If skipmissing=true, rows with missings all go to group 0,
# which will be removed by functions down the stream
ngroupstup = map(cols) do c
nlevels(c) + (!skipmissing && eltype(c) >: Missing)
ngroupstup = map(refpools) do refpool
len = length(refpool)
if skipmissing && eltype(refpool) >: Missing && any(ismissing, refpool)
return len - 1
else
return len
end
end
ngroups = prod(ngroupstup)

Expand All @@ -167,43 +185,82 @@ function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}}
# but it needs to remain reasonable compared with the size of the data frame.
if prod(Int128.(ngroupstup)) > typemax(Int) || ngroups > 2 * length(groups)
return invoke(row_group_slots,
Tuple{Tuple{Vararg{AbstractVector}}, Val,
Union{Vector{Int}, Nothing}, Bool},
cols, hash, groups, skipmissing)
Tuple{Tuple{Vararg{AbstractVector}}, Any, Val,
Union{Vector{Int}, Nothing}, Bool, Bool},
cols, refpools, hash, groups, skipmissing, sort)
end

seen = fill(false, ngroups)
# Compute vector mapping missing to -1 if skipmissing=true
refmaps = map(cols) do col
nlevs = nlevels(col)
refmap = collect(-1:(nlevs-1))
# First value in refmap is only used by CategoricalArray
# (corresponds to ref 0, i.e. missing values)
refmap[1] = skipmissing ? -1 : nlevs
if col isa PooledArray{>: Missing} && skipmissing
missingind = get(col.invpool, missing, 0)
if missingind > 0
refmap[missingind+1] = -1
refmap[missingind+2:end] .-= 1
refs = map(DataAPI.refarray, cols)
strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N,Int}
firstinds = map(firstindex, refpools)
missinginds = map(refpools) do refpool
something(findfirst(ismissing, refpool), lastindex(refpool)+1)
end
# TODO: when skipmissing=true, do not include missing values
# when checking whether pool is sorted
if sort && !all(issorted, refpools)
# Compute vector mapping missing to -1 if skipmissing=true
refmaps = map(cols, refpools, missinginds) do col, refpool, missingind
refmap = collect(0:length(refpool)-1)
if skipmissing
fi = firstindex(refpool)
if missingind !== nothing
mi = something(missingind)
refmap[mi-fi+1] = -1
refmap[mi-fi+2:end] .-= 1
end
if sort
nm = missingind === nothing ? eachindex(refpool) :
setdiff(eachindex(refpool), something(missingind))
perm = sortperm(view(refpool, nm))
invpermute!(view(refmap, nm .- fi .+ 1), perm)
end
elseif sort
# FIXME: collect is needed for CategoricalRefPool
perm = sortperm(collect(refpool))
invpermute!(refmap, perm)
end
refmap
end
refmap
end
strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N,Int}
@inbounds for i in eachindex(groups)
local refs
let i=i # Workaround for julia#15276
refs = map(c -> c.refs[i], cols)
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refs)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
vals = map((m, r, s) -> m[r+1] * s, refmaps, refs, strides)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
else
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refs, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
else
return r
end
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
groups[i] = j
end
if !all(seen) # Compress group indices to remove unused ones
oldngroups = ngroups
Expand All @@ -220,8 +277,7 @@ function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}}
# To catch potential bugs inducing unnecessary computations
@assert oldngroups != ngroups
end
sorted = all(col -> col isa CategoricalVector, cols)
return ngroups, UInt[], Int[], sorted
return ngroups, UInt[], Int[], sort
end


Expand Down Expand Up @@ -267,7 +323,7 @@ end
function group_rows(df::AbstractDataFrame)
groups = Vector{Int}(undef, nrow(df))
ngroups, rhashes, gslots, sorted =
row_group_slots(ntuple(i -> df[!, i], ncol(df)), Val(true), groups, false)
row_group_slots(ntuple(i -> df[!, i], ncol(df)), Val(true), groups, false, false)
rperm, starts, stops = compute_indices(groups, ngroups)
return RowGroupDict(df, rhashes, gslots, groups, rperm, starts, stops)
end
Expand Down
3 changes: 2 additions & 1 deletion src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ function groupby(df::AbstractDataFrame, cols;

groups = Vector{Int}(undef, nrow(df))
ngroups, rhashes, gslots, sorted =
row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false), groups, skipmissing)
row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false),
groups, skipmissing, sort)

gd = GroupedDataFrame(df, copy(_names(sdf)), groups, nothing, nothing, nothing, ngroups, nothing,
Threads.ReentrantLock())
Expand Down
11 changes: 0 additions & 11 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -632,17 +632,6 @@ end
dfs = [groupby_checked(dfb, [:Key1, :Key2, :Key3], sort=true, skipmissing=true)...]
@test isequal_unordered(gd, dfs)
@test issorted(vcat(gd...), [:Key1, :Key2, :Key3])

# This is an implementation detail but it allows checking
# that the optimized method is used
if df.Key1 isa CategoricalVector &&
df.Key2 isa CategoricalVector &&
df.Key3 isa CategoricalVector
@test groupby_checked(df, [:Key1, :Key2, :Key3], sort=true)
groupby_checked(df, [:Key1, :Key2, :Key3], sort=false)
@test groupby_checked(df, [:Key1, :Key2, :Key3], sort=true, skipmissing=true)
groupby_checked(df, [:Key1, :Key2, :Key3], sort=false, skipmissing=true)
end
end
end

Expand Down

0 comments on commit bf44745

Please sign in to comment.