From da4647d9756046e7f480c845f4243dadc294841e Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 19 Sep 2020 00:05:48 +0200 Subject: [PATCH 1/3] Use DataAPI.refpool for optimized grouping Generalize existing optimized `row_group_slots` method for `CategoricalArray` and `PooledArray` so that it can be used for other array types for which `DataAPI.refpool` returns an `AbstractVector`. This allows dropping the dependency on CategoricalArrays in this part of the code. Also refactor the method to be faster when not sorting. In that case, we do not need to build a map between reference codes and groups (indexing into it is slow when the number of groups is very large). `CategoricalArray` is no longer special cased: when `sort=false`, levels are still sorted, but `missing` appears first. Add more tests to cover weird combinations. --- Project.toml | 2 +- src/dataframerow/utils.jl | 139 +++++++++++++++------- src/groupeddataframe/splitapplycombine.jl | 3 +- test/grouping.jl | 111 +++++++++++++++-- 4 files changed, 201 insertions(+), 54 deletions(-) diff --git a/Project.toml b/Project.toml index 80cb062e3c..e1e0864e35 100644 --- a/Project.toml +++ b/Project.toml @@ -35,7 +35,7 @@ test = ["DataStructures", "DataValues", "Dates", "Logging", "Random", "Test"] [compat] julia = "1" -CategoricalArrays = "0.8" +CategoricalArrays = "0.8.3" Compat = "2.2, 3" DataAPI = "1.2" InvertedIndices = "1" diff --git a/src/dataframerow/utils.jl b/src/dataframerow/utils.jl index 00b98dae90..0f6b645a4c 100644 --- a/src/dataframerow/utils.jl +++ b/src/dataframerow/utils.jl @@ -94,10 +94,20 @@ isequal_row(cols1::Tuple{Vararg{AbstractVector}}, r1::Int, # 4) whether groups are already sorted # Optional `groups` vector is set to the group indices of each row (starting at 1) # With skipmissing=true, rows with missing values are attributed index 0. +row_group_slots(cols::Tuple{Vararg{AbstractVector}}, + hash::Val = Val(true), + groups::Union{Vector{Int}, Nothing} = nothing, + skipmissing::Bool = false, + sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} = + row_group_slots(cols, DataAPI.refpool.(cols), hash, groups, skipmissing, sort) + +# Generic fallback method based on open adressing hash table function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, + refpools::Any, hash::Val = Val(true), groups::Union{Vector{Int}, Nothing} = nothing, - skipmissing::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} + skipmissing::Bool = false, + sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} @assert groups === nothing || length(groups) == length(cols[1]) rhashes, missings = hashrows(cols, skipmissing) # inspired by Dict code from base cf. https://github.com/JuliaData/DataTables.jl/pull/17#discussion_r102481481 @@ -140,21 +150,29 @@ function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, return ngroups, rhashes, gslots, false end -nlevels(x::PooledArray) = length(x.pool) -nlevels(x) = length(levels(x)) - -function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}}, +# Optimized method for arrays for which DataAPI.refpool is defined and returns an AbstractVector +function row_group_slots(cols::NTuple{N,<:AbstractVector}, + refpools::NTuple{N,<:AbstractVector}, hash::Val{false}, groups::Union{Vector{Int}, Nothing} = nothing, - skipmissing::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N + skipmissing::Bool = false, + sort::Bool = false)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N # Computing neither hashes nor groups isn't very useful, # and this method needs to allocate a groups vector anyway @assert groups !== nothing && all(col -> length(col) == length(groups), cols) + refpools = map(DataAPI.refpool, cols) + foreach(refpool -> @assert(allunique(refpool)), refpools) + # If skipmissing=true, rows with missings all go to group 0, # which will be removed by functions down the stream - ngroupstup = map(cols) do c - nlevels(c) + (!skipmissing && eltype(c) >: Missing) + ngroupstup = map(refpools) do refpool + len = length(refpool) + if skipmissing && eltype(refpool) >: Missing && any(ismissing, refpool) + return len - 1 + else + return len + end end ngroups = prod(ngroupstup) @@ -167,43 +185,83 @@ function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}} # but it needs to remain reasonable compared with the size of the data frame. if prod(Int128.(ngroupstup)) > typemax(Int) || ngroups > 2 * length(groups) return invoke(row_group_slots, - Tuple{Tuple{Vararg{AbstractVector}}, Val, - Union{Vector{Int}, Nothing}, Bool}, - cols, hash, groups, skipmissing) + Tuple{Tuple{Vararg{AbstractVector}}, Any, Val, + Union{Vector{Int}, Nothing}, Bool, Bool}, + cols, refpools, hash, groups, skipmissing, sort) end seen = fill(false, ngroups) - # Compute vector mapping missing to -1 if skipmissing=true - refmaps = map(cols) do col - nlevs = nlevels(col) - refmap = collect(-1:(nlevs-1)) - # First value in refmap is only used by CategoricalArray - # (corresponds to ref 0, i.e. missing values) - refmap[1] = skipmissing ? -1 : nlevs - if col isa PooledArray{>: Missing} && skipmissing - missingind = get(col.invpool, missing, 0) - if missingind > 0 - refmap[missingind+1] = -1 - refmap[missingind+2:end] .-= 1 + refs = map(DataAPI.refarray, cols) + strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N,Int} + firstinds = map(firstindex, refpools) + # TODO: when skipmissing=true, do not include missing values + # when checking whether pool is sorted + if sort && !all(issorted, refpools) + # Compute vector mapping missing to -1 if skipmissing=true + refmaps = map(cols, refpools) do col, refpool + refmap = collect(0:length(refpool)-1) + if skipmissing + fi = firstindex(refpool) + missingind = findfirst(ismissing, refpool) + if missingind !== nothing + mi = something(missingind) + refmap[mi-fi+1] = -1 + refmap[mi-fi+2:end] .-= 1 + end + if sort + nm = missingind === nothing ? eachindex(refpool) : + setdiff(eachindex(refpool), missingind) + perm = sortperm(view(refpool, nm)) + invpermute!(view(refmap, nm .- fi .+ 1), perm) + end + elseif sort + # FIXME: collect is needed for CategoricalRefPool + perm = sortperm(collect(refpool)) + invpermute!(refmap, perm) end + refmap end - refmap - end - strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N,Int} - @inbounds for i in eachindex(groups) - local refs - let i=i # Workaround for julia#15276 - refs = map(c -> c.refs[i], cols) + @inbounds for i in eachindex(groups) + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(c -> c[i], refs) + end + vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds) + j = sum(vals) + 1 + # x < 0 happens with -1 in refmap, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 + else + seen[j] = true + end + groups[i] = j end - vals = map((m, r, s) -> m[r+1] * s, refmaps, refs, strides) - j = sum(vals) + 1 - # x < 0 happens with -1 in refmap, which corresponds to missing - if skipmissing && any(x -> x < 0, vals) - j = 0 - else - seen[j] = true + else + missinginds = map(refpools) do refpool + something(findfirst(ismissing, refpool), lastindex(refpool)+1) + end + @inbounds for i in eachindex(groups) + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(refs, missinginds) do ref, missingind + r = Int(ref[i]) + if skipmissing + return r == missingind ? -1 : (r > missingind ? r-1 : r) + else + return r + end + end + end + vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds) + j = sum(vals) + 1 + # x < 0 happens with -1, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 + else + seen[j] = true + end + groups[i] = j end - groups[i] = j end if !all(seen) # Compress group indices to remove unused ones oldngroups = ngroups @@ -220,8 +278,7 @@ function row_group_slots(cols::NTuple{N,<:Union{CategoricalVector,PooledVector}} # To catch potential bugs inducing unnecessary computations @assert oldngroups != ngroups end - sorted = all(col -> col isa CategoricalVector, cols) - return ngroups, UInt[], Int[], sorted + return ngroups, UInt[], Int[], sort end @@ -267,7 +324,7 @@ end function group_rows(df::AbstractDataFrame) groups = Vector{Int}(undef, nrow(df)) ngroups, rhashes, gslots, sorted = - row_group_slots(ntuple(i -> df[!, i], ncol(df)), Val(true), groups, false) + row_group_slots(ntuple(i -> df[!, i], ncol(df)), Val(true), groups, false, false) rperm, starts, stops = compute_indices(groups, ngroups) return RowGroupDict(df, rhashes, gslots, groups, rperm, starts, stops) end diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index d7b1c23d86..3e8034640f 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -160,7 +160,8 @@ function groupby(df::AbstractDataFrame, cols; groups = Vector{Int}(undef, nrow(df)) ngroups, rhashes, gslots, sorted = - row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false), groups, skipmissing) + row_group_slots(ntuple(i -> sdf[!, i], ncol(sdf)), Val(false), + groups, skipmissing, sort) gd = GroupedDataFrame(df, copy(_names(sdf)), groups, nothing, nothing, nothing, ngroups, nothing, Threads.ReentrantLock()) diff --git a/test/grouping.jl b/test/grouping.jl index c8839c9fa9..ce434e6511 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -594,6 +594,106 @@ end end end +@testset "grouping arrays that allow missing without missings" begin + xv = ["A", "B", "B", "B", "A", "B", "A", "A"] + yv = ["B", "A", "A", "B", "A", "B", "A", "A"] + xvars = (xv, + categorical(xv), + levels!(categorical(xv), ["A", "B", "X"]), + levels!(categorical(xv), ["X", "B", "A"]), + _levels!(PooledArray(xv), ["A", "B"]), + _levels!(PooledArray(xv), ["B", "A", "X"]), + _levels!(PooledArray(xv), ["X", "A", "B"])) + yvars = (yv, + categorical(yv), + levels!(categorical(yv), ["A", "B", "X"]), + levels!(categorical(yv), ["B", "X", "A"]), + _levels!(PooledArray(yv), ["A", "B"]), + _levels!(PooledArray(yv), ["A", "B", "X"]), + _levels!(PooledArray(yv), ["B", "A", "X"])) + for x in xvars, y in yvars, + fx in (identity, allowmissing), + fy in (identity, allowmissing) + df = DataFrame(Key1 = fx(x), Key2 = fy(y), Value = 1:8) + + @testset "sort=false, skipmissing=false" begin + gd = groupby_checked(df, :Key1) + @test length(gd) == 2 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2=["B", "A", "A", "A"], Value=[1, 5, 7, 8]), + DataFrame(Key1="B", Key2=["A", "A", "B", "B"], Value=[2, 3, 4, 6]), + ]) + + gd = groupby_checked(df, [:Key1, :Key2]) + @test length(gd) == 4 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2="A", Value=[5, 7, 8]), + DataFrame(Key1="A", Key2="B", Value=1), + DataFrame(Key1="B", Key2="A", Value=[2, 3]), + DataFrame(Key1="B", Key2="B", Value=[4, 6]) + ]) + end + + @testset "sort=false, skipmissing=true" begin + gd = groupby_checked(df, :Key1, skipmissing=true) + @test length(gd) == 2 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2=["B", "A", "A", "A"], Value=[1, 5, 7, 8]), + DataFrame(Key1="B", Key2=["A", "A", "B", "B"], Value=[2, 3, 4, 6]) + ]) + + gd = groupby_checked(df, [:Key1, :Key2], skipmissing=true) + @test length(gd) == 4 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2="A", Value=[5, 7, 8]), + DataFrame(Key1="A", Key2="B", Value=1), + DataFrame(Key1="B", Key2="A", Value=[2, 3]), + DataFrame(Key1="B", Key2="B", Value=[4, 6]) + ]) + end + + @testset "sort=true, skipmissing=false" begin + gd = groupby_checked(df, :Key1, sort=true) + @test length(gd) == 2 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2=["B", "A", "A", "A"], Value=[1, 5, 7, 8]), + DataFrame(Key1="B", Key2=["A", "A", "B", "B"], Value=[2, 3, 4, 6]), + ]) + @test issorted(vcat(gd...), :Key1) + + gd = groupby_checked(df, [:Key1, :Key2], sort=true) + @test length(gd) == 4 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2="A", Value=[5, 7, 8]), + DataFrame(Key1="A", Key2="B", Value=1), + DataFrame(Key1="B", Key2="A", Value=[2, 3]), + DataFrame(Key1="B", Key2="B", Value=[4, 6]), + ]) + @test issorted(vcat(gd...), [:Key1, :Key2]) + end + + @testset "sort=true, skipmissing=true" begin + gd = groupby_checked(df, :Key1, sort=true, skipmissing=true) + @test length(gd) == 2 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2=["B", "A", "A", "A"], Value=[1, 5, 7, 8]), + DataFrame(Key1="B", Key2=["A", "A", "B", "B"], Value=[2, 3, 4, 6]) + ]) + @test issorted(vcat(gd...), :Key1) + + gd = groupby_checked(df, [:Key1, :Key2], sort=true, skipmissing=true) + @test length(gd) == 4 + @test isequal_unordered(gd, [ + DataFrame(Key1="A", Key2="A", Value=[5, 7, 8]), + DataFrame(Key1="A", Key2="B", Value=1), + DataFrame(Key1="B", Key2="A", Value=[2, 3]), + DataFrame(Key1="B", Key2="B", Value=[4, 6]) + ]) + @test issorted(vcat(gd...), [:Key1, :Key2]) + end + end +end + @testset "grouping with three keys" begin # We need many rows so that optimized CategoricalArray method is used xv = rand(["A", "B", missing], 100) @@ -632,17 +732,6 @@ end dfs = [groupby_checked(dfb, [:Key1, :Key2, :Key3], sort=true, skipmissing=true)...] @test isequal_unordered(gd, dfs) @test issorted(vcat(gd...), [:Key1, :Key2, :Key3]) - - # This is an implementation detail but it allows checking - # that the optimized method is used - if df.Key1 isa CategoricalVector && - df.Key2 isa CategoricalVector && - df.Key3 isa CategoricalVector - @test groupby_checked(df, [:Key1, :Key2, :Key3], sort=true) ≅ - groupby_checked(df, [:Key1, :Key2, :Key3], sort=false) - @test groupby_checked(df, [:Key1, :Key2, :Key3], sort=true, skipmissing=true) ≅ - groupby_checked(df, [:Key1, :Key2, :Key3], sort=false, skipmissing=true) - end end end From b46ed0006fd68b4066e819d47f8a54a77c84199f Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sun, 20 Sep 2020 19:06:43 +0200 Subject: [PATCH 2/3] Allow duplicates, hash refs, and take into account skipmissing when checking sortedness --- src/dataframerow/utils.jl | 67 ++++++++++++++++++++++++--------------- test/grouping.jl | 16 ++++++++++ 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/dataframerow/utils.jl b/src/dataframerow/utils.jl index 0f6b645a4c..6b892b62bf 100644 --- a/src/dataframerow/utils.jl +++ b/src/dataframerow/utils.jl @@ -162,13 +162,17 @@ function row_group_slots(cols::NTuple{N,<:AbstractVector}, @assert groups !== nothing && all(col -> length(col) == length(groups), cols) refpools = map(DataAPI.refpool, cols) - foreach(refpool -> @assert(allunique(refpool)), refpools) + refs = map(DataAPI.refarray, cols) + missinginds = map(refpools) do refpool + eltype(refpool) >: Missing ? + something(findfirst(ismissing, refpool), lastindex(refpool)+1) : lastindex(refpool)+1 + end # If skipmissing=true, rows with missings all go to group 0, # which will be removed by functions down the stream - ngroupstup = map(refpools) do refpool + ngroupstup = map(refpools, missinginds) do refpool, missingind len = length(refpool) - if skipmissing && eltype(refpool) >: Missing && any(ismissing, refpool) + if skipmissing && missingind <= lastindex(refpool) return len - 1 else return len @@ -176,48 +180,62 @@ function row_group_slots(cols::NTuple{N,<:AbstractVector}, end ngroups = prod(ngroupstup) - # Fall back to hashing if there would be too many empty combinations. + # Fall back to hashing if there would be too many empty combinations + # or if the pool does not contain only unique values # The first check ensures the computation of ngroups did not overflow. # The rationale for the 2 threshold is that while the fallback method is always slower, # it allocates a hash table of size length(groups) instead of the remap vector # of size ngroups (i.e. the number of possible combinations) in this method: # so it makes sense to allocate more memory for better performance, # but it needs to remain reasonable compared with the size of the data frame. - if prod(Int128.(ngroupstup)) > typemax(Int) || ngroups > 2 * length(groups) + anydups = !all(allunique, refpools) + if prod(Int128.(ngroupstup)) > typemax(Int) || + ngroups > 2 * length(groups) || + anydups + # In the simplest case, we can work directly with the reference codes + newcols = (skipmissing && any(refpool -> eltype(refpool) >: Missing, refpools)) || + sort || + anydups ? cols : refs return invoke(row_group_slots, Tuple{Tuple{Vararg{AbstractVector}}, Any, Val, Union{Vector{Int}, Nothing}, Bool, Bool}, - cols, refpools, hash, groups, skipmissing, sort) + newcols, refpools, hash, groups, skipmissing, sort) end seen = fill(false, ngroups) - refs = map(DataAPI.refarray, cols) strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N,Int} firstinds = map(firstindex, refpools) - # TODO: when skipmissing=true, do not include missing values - # when checking whether pool is sorted - if sort && !all(issorted, refpools) + if sort + nminds = map(refpools, missinginds) do refpool, missingind + missingind > lastindex(refpool) ? + eachindex(refpool) : setdiff(eachindex(refpool), missingind) + end + if skipmissing + sorted = all(issorted(view(refpool, nmind)) + for (refpool, nmind) in zip(refpools, nminds)) + else + sorted = all(issorted, refpools) + end + else + sorted = false + end + if sort && !sorted # Compute vector mapping missing to -1 if skipmissing=true - refmaps = map(cols, refpools) do col, refpool + refmaps = map(cols, refpools, missinginds, nminds) do col, refpool, missingind, nmind refmap = collect(0:length(refpool)-1) if skipmissing fi = firstindex(refpool) - missingind = findfirst(ismissing, refpool) - if missingind !== nothing - mi = something(missingind) - refmap[mi-fi+1] = -1 - refmap[mi-fi+2:end] .-= 1 + if missingind <= lastindex(refpool) + refmap[missingind-fi+1] = -1 + refmap[missingind-fi+2:end] .-= 1 end if sort - nm = missingind === nothing ? eachindex(refpool) : - setdiff(eachindex(refpool), missingind) - perm = sortperm(view(refpool, nm)) - invpermute!(view(refmap, nm .- fi .+ 1), perm) + perm = sortperm(view(refpool, nmind)) + invpermute!(view(refmap, nmind .- fi .+ 1), perm) end elseif sort - # FIXME: collect is needed for CategoricalRefPool - perm = sortperm(collect(refpool)) - invpermute!(refmap, perm) + # collect is needed for CategoricalRefPool + invpermute!(refmap, sortperm(collect(refpool))) end refmap end @@ -237,9 +255,6 @@ function row_group_slots(cols::NTuple{N,<:AbstractVector}, groups[i] = j end else - missinginds = map(refpools) do refpool - something(findfirst(ismissing, refpool), lastindex(refpool)+1) - end @inbounds for i in eachindex(groups) local refs_i let i=i # Workaround for julia#15276 diff --git a/test/grouping.jl b/test/grouping.jl index ce434e6511..b764c305ff 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -694,6 +694,22 @@ end end end +@testset "grouping refarray with fallback" begin + # The high number of categories compared to the number of rows triggers the use + # of the fallback grouping method + for x in ([3, 1, 2], [3, 1, missing]) + df = DataFrame(x=categorical(x, levels=10000:-1:1), + x2=categorical(x, levels=3:-1:1), + y=[1, 2, 3]) + for skipmissing in (true, false) + @test groupby(df, :x, sort=true, skipmissing=skipmissing) ≅ + groupby(df, :x, sort=true, skipmissing=skipmissing) + @test isequal_unordered(groupby(df, :x, skipmissing=skipmissing), + collect(AbstractDataFrame, groupby(df, :x, skipmissing=skipmissing))) + end + end +end + @testset "grouping with three keys" begin # We need many rows so that optimized CategoricalArray method is used xv = rand(["A", "B", missing], 100) From 9d0596591ab2cbdc3f2b20ffc06a16a3c6f4d3e2 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Mon, 21 Sep 2020 21:49:49 +0200 Subject: [PATCH 3/3] Apply suggestions from code review --- src/dataframerow/utils.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dataframerow/utils.jl b/src/dataframerow/utils.jl index 6b892b62bf..3c503c9048 100644 --- a/src/dataframerow/utils.jl +++ b/src/dataframerow/utils.jl @@ -161,7 +161,6 @@ function row_group_slots(cols::NTuple{N,<:AbstractVector}, # and this method needs to allocate a groups vector anyway @assert groups !== nothing && all(col -> length(col) == length(groups), cols) - refpools = map(DataAPI.refpool, cols) refs = map(DataAPI.refarray, cols) missinginds = map(refpools) do refpool eltype(refpool) >: Missing ? @@ -189,7 +188,7 @@ function row_group_slots(cols::NTuple{N,<:AbstractVector}, # so it makes sense to allocate more memory for better performance, # but it needs to remain reasonable compared with the size of the data frame. anydups = !all(allunique, refpools) - if prod(Int128.(ngroupstup)) > typemax(Int) || + if prod(big.(ngroupstup)) > typemax(Int) || ngroups > 2 * length(groups) || anydups # In the simplest case, we can work directly with the reference codes