Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix row_group_slots_threading #2736

Merged
merged 27 commits into from May 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c42acc4
fix row_group_slots_threading
bkamins Apr 26, 2021
38d047e
add a condition
bkamins Apr 26, 2021
2fee1d5
fix typo
bkamins Apr 26, 2021
ab81085
rewrite the implementation of row_group_slots
bkamins Apr 27, 2021
2d4b750
parallelize reductions
bkamins Apr 28, 2021
62c13d8
Update src/groupeddataframe/utils.jl
bkamins Apr 28, 2021
c904526
fix return location
bkamins Apr 28, 2021
4778f85
switch syntax
bkamins Apr 28, 2021
c0cf994
add correctness tests
bkamins Apr 28, 2021
3f90f1d
add documentation message
bkamins Apr 28, 2021
220c618
improve cut-off formula
bkamins Apr 28, 2021
fbfb02d
Apply suggestions from code review
bkamins Apr 29, 2021
b228ad2
switch to Threads.@threads
bkamins Apr 29, 2021
eca7731
update tests
bkamins Apr 29, 2021
17d2265
handle nested parallelism case
bkamins Apr 29, 2021
58c0e57
revert @spawn_for_chunks
bkamins Apr 29, 2021
c913797
increase @spawn_for_threads threshold
bkamins Apr 29, 2021
cf473b6
Update test/grouping.jl
bkamins Apr 29, 2021
27c02bf
Update src/groupeddataframe/groupeddataframe.jl
bkamins Apr 29, 2021
5d7b973
Update groupeddataframe.jl
bkamins Apr 29, 2021
97b4b4a
move @split_for_chunks back to other/utils.jl
bkamins Apr 29, 2021
6bf000d
Update src/other/utils.jl
bkamins Apr 29, 2021
ddf1f4c
use @spawn instead of Threads.@threads
bkamins Apr 30, 2021
1c99358
fix typo
bkamins Apr 30, 2021
9150328
avoid code duplication
bkamins Apr 30, 2021
90ff84b
clean up chunk splitting code
bkamins Apr 30, 2021
5f5802f
revert to 1_000_000 in hashing
bkamins May 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
@@ -1,6 +1,6 @@
name = "DataFrames"
uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
version = "1.0.1"
version = "1.0.2"

[deps]
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Expand Down
5 changes: 3 additions & 2 deletions src/groupeddataframe/groupeddataframe.jl
Expand Up @@ -51,7 +51,8 @@ into row groups.
`DataAPI.refpool` in which case the order of groups follows the order of
values returned by `DataAPI.refpool`. As a particular application of this rule
if all `cols` are `CategoricalVector`s then groups are always sorted
irrespective of the value of `sort`.
Integer columns with a narrow range also use this this optimization, so
to the order of groups when grouping on integer columns is undefined.
- `skipmissing` : whether to skip groups with `missing` values in one of the
grouping columns `cols`

Expand All @@ -65,7 +66,7 @@ In particular if it is an empty vector then a single-group `GroupedDataFrame`
is created.

A `GroupedDataFrame` also supports
indexing by groups, `map` (which applies a function to each group)
indexing by groups, `select`, `transform`,
and `combine` (which applies a function to each group
and combines the result into a data frame).

Expand Down
135 changes: 92 additions & 43 deletions src/groupeddataframe/utils.jl
Expand Up @@ -5,11 +5,13 @@ function hashrows_col!(h::Vector{UInt},
v::AbstractVector{T},
rp::Nothing,
firstcol::Bool) where T
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h)
el = v[i]
h[i] = hash(el, h[i])
if length(n) > 0
n[i] |= ismissing(el)
@spawn_for_chunks 1_000_000 for i in eachindex(h)
@inbounds begin
el = v[i]
h[i] = hash(el, h[i])
if length(n) > 0
n[i] |= ismissing(el)
end
end
end
h
Expand All @@ -31,19 +33,19 @@ function hashrows_col!(h::Vector{UInt},
fira = firstindex(ra)

hashes = Vector{UInt}(undef, length(rp))
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(hashes)
hashes[i] = hash(rp[i+firp-1])
@spawn_for_chunks 1_000_000 for i in eachindex(hashes)
@inbounds hashes[i] = hash(rp[i+firp-1])
end

# here we rely on the fact that `DataAPI.refpool` has a continuous
# block of indices
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h)
ref = ra[i+fira-1]
h[i] = hashes[ref+1-firp]
@spawn_for_chunks 1_000_000 for i in eachindex(h)
@inbounds ref = ra[i+fira-1]
@inbounds h[i] = hashes[ref+1-firp]
end
else
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h, v)
h[i] = hash(v[i], h[i])
@spawn_for_chunks 1_000_000 for i in eachindex(h, v)
@inbounds h[i] = hash(v[i], h[i])
end
end
# Doing this step separately is faster, as it would disable SIMD above
Expand Down Expand Up @@ -286,7 +288,6 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
newcols, refpools, refarrays, hash, groups, skipmissing, sort)
end

seen = fill(false, ngroups)
strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N, Int}
firstinds = map(firstindex, refpools)
if sort
Expand All @@ -303,6 +304,21 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
else
sorted = false
end

lg = length(groups)
nt = Threads.nthreads()
# disable threading if we are processing a small data frame or number of groups is large
if lg < 1_000_000 || ngroups > lg * (0.5 - 1 / (2 * nt)) / (2 * nt)
nt = 1
end
seen = fill(false, ngroups)
seen_vec = Vector{Vector{Bool}}(undef, nt)
seen_vec[1] = seen
for i in 2:nt
seen_vec[i] = fill(false, ngroups)
end
range_chunks = split_to_chunks(lg, nt)

if sort && !sorted
# Compute vector mapping missing to -1 if skipmissing=true
refmaps = map(cols, refpools, missinginds, nminds) do col, refpool, missingind, nmind
Expand All @@ -323,45 +339,78 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
end
refmap
end
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refarrays)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays) do c
return @inbounds c[i]
end
end
vals = map(refmaps, refs_i, strides, firstinds) do m, r, s, fi
return @inbounds m[r-fi+1] * s
end
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seeni[j] = true
end
groups[i] = j
end
end
groups[i] = j
end
else
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = @inbounds Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
else
return r
end
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
return r
seeni[j] = true
end
groups[i] = j
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
end

function reduce_or!(x::AbstractVector{Vector{Bool}})
len = length(x)
if len < 2
return
elseif len == 2
x[1] .|= x[2]
else
xl = view(x, 1:len ÷ 2)
xr = view(x, len ÷ 2 + 1:len)
t1 = @spawn reduce_or!(xl)
t2 = @spawn reduce_or!(xr)
fetch(t1)
fetch(t2)
xl[1] .|= xr[1]
end
return
end
bkamins marked this conversation as resolved.
Show resolved Hide resolved

reduce_or!(seen_vec)

# If some groups are unused, compress group indices to drop them
# sum(seen) is faster than all(seen) when not short-circuiting,
# and short-circuit would only happen in the slower case anyway
Expand Down
2 changes: 1 addition & 1 deletion src/other/precompile.jl
Expand Up @@ -1586,7 +1586,7 @@ function precompile(all=false)
Base.precompile(Tuple{Reduce{typeof(max), Nothing, Nothing},Vector{Int},GroupedDataFrame{DataFrame}})
Base.precompile(Tuple{Type{OnCol},Vector{String},Vararg{AbstractVector{T} where T, N} where N})

for v in ([1, 2], [2, 1], [2, 2, 1]),
for v in ([1, 2], [2, 1], [2, 2, 1], Int32[1, 2], Int32[2, 1], Int32[2, 2, 1]),
op in (identity, x -> string.(x), x -> PooledArrays.PooledArray(string.(x))),
on in (:v1, [:v1, :v2])
df = DataFrame(v1=op(v), v2=v)
Expand Down
12 changes: 11 additions & 1 deletion src/other/utils.jl
Expand Up @@ -88,7 +88,17 @@ funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))
# This method ensures balanced sizes by avoiding a small last chunk
function split_indices(len::Integer, basesize::Integer)
len′ = Int64(len) # Avoid overflow on 32-bit machines
np = max(1, div(len, basesize))
@assert len′ > 0
@assert basesize > 0
np = Int64(max(1, len ÷ basesize))
return split_to_chunks(len′, np)
end

function split_to_chunks(len::Integer, np::Integer)
len′ = Int64(len) # Avoid overflow on 32-bit machines
np′ = Int64(np)
@assert len′ > 0
@assert 0 < np′ <= len′
return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np)
end

Expand Down
51 changes: 50 additions & 1 deletion test/grouping.jl
Expand Up @@ -3888,9 +3888,58 @@ end

@testset "extra tests of wrapper corner cases" begin
df = DataFrame(a=1:2)
gdf = groupby(df, :a)
gdf = groupby_checked(df, :a)
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? 1 : x[1, :])
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? (a=1, b=2) : Ref(1))
end

@testset "grouping correctness with threading" begin
function cmp_gdf(gdf1::GroupedDataFrame, gdf2::GroupedDataFrame)
@test gdf1.ngroups == gdf2.ngroups
@test gdf1.groups == gdf2.groups
@test gdf1.starts == gdf2.starts
@test gdf1.ends == gdf2.ends
@test gdf1.idx == gdf2.idx
end

Random.seed!(1234)
for levs in (100, 99_000), sz in (100_000, 1_100_000)
df = DataFrame(x_int=rand(1:levs, sz))
df.x_str = string.(df.x_int, pad=5)
df.x_pool = PooledArray(df.x_str)
g_str = groupby_checked(df, :x_str)
g_pool = groupby_checked(df, :x_pool)
cmp_gdf(g_str, g_pool)
g_int = groupby_checked(df, :x_int, sort=true)
g_str = groupby_checked(df, :x_str, sort=true)
g_pool = groupby_checked(df, :x_pool, sort=true)
cmp_gdf(g_int, g_pool)
cmp_gdf(g_str, g_pool)

df = df[reverse(1:nrow(df)), :]
g_str = groupby_checked(df, :x_str, sort=true)
g_pool = groupby_checked(df, :x_pool, sort=true)
cmp_gdf(g_str, g_pool)

df = DataFrame(x_int=[1:levs; rand(1:levs, sz)])
df.x_str = string.(df.x_int, pad=5)
df.x_pool = PooledArray(df.x_str)
allowmissing!(df)
df[rand(levs+1:sz, 10_000), :] .= missing
g_str = groupby_checked(df, :x_str)
g_pool = groupby_checked(df, :x_pool)
cmp_gdf(g_str, g_pool)
for sm in (false, true)
g_str = groupby_checked(df, :x_str, skipmissing=sm)
g_pool = groupby_checked(df, :x_pool, skipmissing=sm)
cmp_gdf(g_str, g_pool)
g_int = groupby_checked(df, :x_int, sort=true, skipmissing=sm)
g_str = groupby_checked(df, :x_str, sort=true, skipmissing=sm)
g_pool = groupby_checked(df, :x_pool, sort=true, skipmissing=sm)
cmp_gdf(g_int, g_pool)
cmp_gdf(g_str, g_pool)
end
end
end

end # module
30 changes: 27 additions & 3 deletions test/utils.jl
Expand Up @@ -102,17 +102,19 @@ end
end

@testset "split_indices" begin
for len in 0:12
basesize = 10
for len in 1:100, basesize in 1:10
x = DataFrames.split_indices(len, basesize)

@test length(x) == max(1, div(len, basesize))
@test reduce(vcat, x) === 1:len
@test reduce(vcat, x) == 1:len
vmin, vmax = extrema(length(v) for v in x)
@test vmin + 1 == vmax || vmin == vmax
@test len < basesize || vmin >= basesize
end

@test_throws AssertionError DataFrames.split_indices(0, 10)
@test_throws AssertionError DataFrames.split_indices(10, 0)

# Check overflow on 32-bit
len = typemax(Int32)
basesize = 100_000_000
Expand All @@ -125,4 +127,26 @@ end
@test len < basesize || vmin >= basesize
end

@testset "split_to_chunks" begin
for lg in 1:100, nt in 1:11
if lg < nt
@test_throws AssertionError DataFrames.split_to_chunks(lg, nt)
continue
end
x = collect(DataFrames.split_to_chunks(lg, nt))
@test reduce(vcat, x) == 1:lg
@test sum(length, x) == lg
@test first(x[1]) == 1
@test last(x[end]) == lg
@test length(x) == nt
for i in 1:nt-1
@test first(x[i+1])-last(x[i]) == 1
end
end

@test_throws AssertionError DataFrames.split_to_chunks(0, 10)
@test_throws AssertionError DataFrames.split_to_chunks(10, 0)
@test_throws AssertionError DataFrames.split_to_chunks(10, 11)
end

end # module