From 2d57734356d8a45d6b615241645bc542feda8934 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Tue, 12 May 2020 22:50:58 +0200 Subject: [PATCH 01/10] Support multithreading in groupreduce Keep the default to a single thread until we find a reliable way of predicting a reasonably optimal number of threads. --- .github/workflows/ci.yml | 1 + NEWS.md | 9 + src/DataFrames.jl | 2 +- src/abstractdataframe/selection.jl | 26 ++- src/groupeddataframe/fastaggregates.jl | 137 +++++++++++---- src/groupeddataframe/splitapplycombine.jl | 70 +++++--- test/grouping.jl | 201 +++++++++++++++------- 7 files changed, 316 insertions(+), 130 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6de797902f..0dbdb484e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,7 @@ jobs: - uses: actions/cache@v1 env: cache-name: cache-artifacts + JULIA_NUM_THREADS: 2 with: path: ~/.julia/artifacts key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} diff --git a/NEWS.md b/NEWS.md index a5cdf5a2fd..8580fcea0a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,12 @@ +# DataFrames v1.0 Release Notes + +## New functionalities + +* `combine`, `select` and `transform` with `GroupedDataFrame` now accept + a `nthreads` argument which enables multithreading for some optimized + grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)). + + # DataFrames v0.22 Release Notes ## Breaking changes diff --git a/src/DataFrames.jl b/src/DataFrames.jl index c3e7ac701b..e596b4aa03 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -3,7 +3,7 @@ module DataFrames using Statistics, Printf, REPL using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays, CategoricalArrays @reexport using Missings, InvertedIndices -using Base.Sort, Base.Order, Base.Iterators +using Base.Sort, Base.Order, Base.Iterators, Base.Threads using TableTraits, IteratorInterfaceExtensions import LinearAlgebra: norm using Markdown diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index df58ac2cea..41cb38bc84 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -644,9 +644,10 @@ end select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) select(args::Callable, df::DataFrame; renamecols::Bool=true) select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -664,6 +665,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. # Examples ```jldoctest @@ -858,9 +862,11 @@ end transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) transform(f::Callable, df::DataFrame; renamecols::Bool=true) transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -877,6 +883,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false` is needed to be able to return a different value for the grouping column: @@ -924,9 +933,11 @@ end combine(df::AbstractDataFrame, args...; renamecols::Bool=true) combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) combine(gd::GroupedDataFrame, args...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, nthreads::Integer=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -941,6 +952,9 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 + currently has an effect only for some optimized grouped reductions. Values higher than + `Threads.nthreads()` will be replaced with that value. # Examples ```jldoctest diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 4bce0e6c5f..6939f60787 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -157,24 +157,84 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T end function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) + incol::AbstractVector, gd::GroupedDataFrame, nthreads::Integer) n = length(gd) + groups = gd.groups if adjust !== nothing || checkempty counts = zeros(Int, n) end - groups = gd.groups - @inbounds for i in eachindex(incol, groups) - gix = groups[i] - x = incol[i] - if gix > 0 && (condf === nothing || condf(x)) - # this check should be optimized out if U is not Any - if eltype(res) === Any && !isassigned(res, gix) - res[gix] = f(x, gix) - else - res[gix] = op(res[gix], f(x, gix)) + nt = min(nthreads, Threads.nthreads()) + if nt <= 1 || axes(incol) != axes(groups) + @inbounds for i in eachindex(incol, groups) + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if eltype is not Any + if eltype(res) === Any && !isassigned(res, gix) + res[gix] = f(x, gix) + else + res[gix] = op(res[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts[gix] += 1 + end + end + end + else + res_vec = Vector{typeof(res)}(undef, nt) + # needs to be always allocated to fix type instability with @threads + counts_vec = Vector{Vector{Int}}(undef, nt) + res_vec[1] = res + if adjust !== nothing || checkempty + counts_vec[1] = counts + end + for i in 2:nt + res_vec[i] = copy(res) + if adjust !== nothing || checkempty + counts_vec[i] = zeros(Int, n) end + end + Threads.@threads for tid in 1:nt + res′ = res_vec[tid] if adjust !== nothing || checkempty - counts[gix] += 1 + counts′ = counts_vec[tid] + end + start = 1 + ((tid - 1) * length(groups)) ÷ nt + stop = (tid * length(groups)) ÷ nt + @inbounds for i in start:stop + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if eltype is not Any + if eltype(res′) === Any && !isassigned(res′, gix) + res′[gix] = f(x, gix) + else + res′[gix] = op(res′[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts′[gix] += 1 + end + end + end + end + for i in 2:length(res_vec) + resi = res_vec[i] + @inbounds @simd for j in eachindex(res) + # this check should be optimized out if eltype is not Any + if eltype(res) === Any + if isassigned(resi, j) && isassigned(res, j) + res[j] = op(res[j], resi[j]) + elseif isassigned(resi, j) + res[j] = resi[j] + end + else + res[j] = op(res[j], resi[j]) + end + end + end + if adjust !== nothing || checkempty + for i in 2:length(counts_vec) + counts .+= counts_vec[i] end end end @@ -218,17 +278,20 @@ end # function barrier works around type instability of groupreduce_init due to applicable groupreduce(f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame, + nthreads::Integer) = groupreduce!(groupreduce_init(op, condf, adjust, incol, gd), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads) # Avoids the overhead due to Missing when computing reduction groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame) = + incol::AbstractVector, gd::GroupedDataFrame, + nthreads::Integer) = groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)), - f, op, condf, adjust, checkempty, incol, gd) + f, op, condf, adjust, checkempty, incol, gd, nthreads) -(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) = - groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd) +(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) = + groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var # TODO: remove this when we drop 1.0 support @@ -236,8 +299,10 @@ if VERSION < v"1.1" Base.zero(::Type{Missing}) = missing end -function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame) - means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd) +function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, + incol, gd, nthreads) # !ismissing check is purely an optimization to avoid a copy later if eltype(means) >: Missing && agg.condf !== !ismissing T = Union{Missing, real(eltype(means))} @@ -247,11 +312,12 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra res = zeros(T, length(gd)) return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf, (x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1), - false, incol, gd) + false, incol, gd, nthreads) end -function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame) - outcol = Aggregate(var, agg.condf)(incol, gd) +function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) else @@ -259,20 +325,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra end end -for f in (first, last) - function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame) - n = length(gd) - outcol = similar(incol, n) - fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) - if isconcretetype(eltype(outcol)) - return outcol - else - return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) +for f in (:first, :last) + # Without using @eval the presence of a keyword argument triggers a Julia bug + @eval begin + function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) + n = length(gd) + outcol = similar(incol, n) + fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) + if isconcretetype(eltype(outcol)) + return outcol + else + return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) + end end end end -function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame) +function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; + nthreads::Integer=1) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index cc59e79153..41eb912f39 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -22,7 +22,7 @@ function _combine_prepare(gd::GroupedDataFrame, @nospecialize(cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...); keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool) + keeprows::Bool, renamecols::Bool, nthreads::Integer) if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end @@ -63,7 +63,8 @@ function _combine_prepare(gd::GroupedDataFrame, # if optional_transform[i] is true then the transformation will be skipped # if earlier column with a column with the same name was created - idx, valscat = _combine(gd, cs_norm, optional_transform, copycols, keeprows, renamecols) + idx, valscat = _combine(gd, cs_norm, optional_transform, + copycols, keeprows, renamecols, nthreads) !keepkeys && ungroup && return valscat @@ -194,13 +195,14 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}) + idx_agg::Union{Nothing, AbstractVector{Int}}, + nthreads::Integer) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) incol = parentdf[!, first(cs_i)] agg = check_aggregate(first(last(cs_i)), incol) - outcol = agg(incol, gd) + outcol = agg(incol, gd; nthreads=nthreads) if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] @@ -485,7 +487,7 @@ end function _combine(gd::GroupedDataFrame, @nospecialize(cs_norm::Vector{Any}), optional_transform::Vector{Bool}, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Integer) if isempty(cs_norm) if keeprows && nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * @@ -529,7 +531,8 @@ function _combine(gd::GroupedDataFrame, optional_i = optional_transform[i] if length(gd) > 0 && isagg(cs_i, gd) - _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) + _combine_process_agg(cs_i, optional_i, parentdf, gd, + seen_cols, trans_res, idx_agg, nthreads) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) # this is a fast path used when we pass a column or rename a column in select or transform @@ -620,82 +623,95 @@ function _combine(gd::GroupedDataFrame, end function combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end - return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols) + return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, + nthreads=nthreads) end combine(f::Pair, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) combine(gd::GroupedDataFrame, cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + nthreads=nthreads) function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + nthreads=nthreads) end select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) = + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, keeprows=true, renamecols=renamecols) + ungroup=ungroup, keeprows=true, renamecols=renamecols, + nthreads=nthreads) function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + nthreads=nthreads) end function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + nthreads::Integer=1) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, renamecols=renamecols) + ungroup=ungroup, renamecols=renamecols, nthreads=nthreads) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, # so parent always gives a data frame select!(parent(res), propertynames(parent(gd)), :) return res end -function select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function select!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select!(gd, f, ungroup=ungroup) + return select!(gd, f, ungroup=ungroup, nthreads=nthreads) end function select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true) - newdf = select(gd, args..., copycols=false, renamecols=renamecols) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) _replace_columns!(df, newdf) return ungroup ? df : gd end -function transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function transform!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform!(gd, f, ungroup=ungroup) + return transform!(gd, f, ungroup=ungroup, nthreads=nthreads) end function transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true) - newdf = select(gd, :, args..., copycols=false, renamecols=renamecols) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) select!(newdf, propertynames(df), :) _replace_columns!(df, newdf) diff --git a/test/grouping.jl b/test/grouping.jl index 27d05e7f49..423dd3a6e2 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -98,6 +98,25 @@ function groupby_checked(df::AbstractDataFrame, keys, args...; kwargs...) return ogd end +function combine_checked(gd::GroupedDataFrame, args...; kwargs...) + res1 = combine(gd, args...; nthreads=1, kwargs...) + res2 = combine(gd, args...; nthreads=2, kwargs...) + @test names(res1) == names(res2) + for (c1, c2) in zip(eachcol(res1), eachcol(res2)) + @test typeof(c1) === typeof(c2) + if eltype(c1) <: Union{AbstractFloat, Missing} + @test ismissing.(c1) == ismissing.(c2) + if !all(ismissing, c1) + @test isapprox(collect(skipmissing(c1)), collect(skipmissing(c2)), + nans=true) + end + else + @test c1 ≅ c2 + end + end + res1 +end + @testset "parent" begin df = DataFrame(a = [1, 1, 2, 2], b = [5, 6, 7, 8]) gd = groupby_checked(df, :a) @@ -911,7 +930,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -921,7 +940,7 @@ Base.isless(::TestType, ::TestType) = false df.x3 = Vector{T}(df.x1) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -933,11 +952,11 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -949,7 +968,7 @@ Base.isless(::TestType, ::TestType) = false if f in (maximum, minimum, first, last) @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) else - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -960,7 +979,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x2 => f => :y) + res = combine_checked(gd, :x2 => f => :y) expected = combine(gd, :x2 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -973,14 +992,14 @@ Base.isless(::TestType, ::TestType) = false m && (df.x3[1] = missing) gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) f === length && continue - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -989,7 +1008,7 @@ Base.isless(::TestType, ::TestType) = false @test_throws ArgumentError combine(gd, :x3 => f∘skipmissing => :y) end end - @test combine(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ + @test combine_checked(gd, :x1 => maximum => :y, :x2 => sum => :z) ≅ combine(gd, :x1 => (x -> maximum(x)) => :y, :x2 => (x -> sum(x)) => :z) # Test floating point corner cases @@ -1000,7 +1019,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x1 => f => :y) + res = combine_checked(gd, :x1 => f => :y) expected = combine(gd, :x1 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) @@ -1011,18 +1030,18 @@ Base.isless(::TestType, ::TestType) = false df.x3[1] = missing gd = groupby_checked(df, :a, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices - res = combine(gd, :x3 => f => :y) + res = combine_checked(gd, :x3 => f => :y) expected = combine(gd, :x3 => (x -> f(x)) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) - res = combine(gd, :x3 => f∘skipmissing => :y) + res = combine_checked(gd, :x3 => f∘skipmissing => :y) expected = combine(gd, :x3 => (x -> f(collect(skipmissing(x)))) => :y) @test res ≅ expected @test typeof(res.y) == typeof(expected.y) end df = DataFrame(x = [1, 1, 2, 2], y = Any[1, 2.0, 3.0, 4.0]) - res = combine(groupby_checked(df, :x), :y => maximum => :z) + res = combine_checked(groupby_checked(df, :x), :y => maximum => :z) @test res.z isa Vector{Float64} @test res.z == combine(groupby_checked(df, :x), :y => (x -> maximum(x)) => :z).z @@ -1031,7 +1050,7 @@ Base.isless(::TestType, ::TestType) = false gd = groupby_checked(df, :x, skipmissing=skip, sort=sort) indices && @test gd.idx !== nothing # Trigger computation of indices for f in (maximum, minimum) - res = combine(gd, :y => maximum => :z) + res = combine_checked(gd, :y => maximum => :z) @test res.z isa Vector{Any} @test res.z == combine(gd, :y => (x -> maximum(x)) => :z).z end @@ -2328,6 +2347,13 @@ end @test parent(gdf2).y ≅ df.y @test parent(gdf2).g === df.g + # Test that nthreads argument is accepted + # Correctness tests are run by combine_checked + @test select(gdf, :x => sum, nthreads=2) ≅ + select(gdf, :x => sum) + @test transform(gdf, :x => sum, nthreads=2) ≅ + transform(gdf, :x => sum) + gdf = groupby_checked(df, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select(gdf, :x => sum) @test_throws ArgumentError select(gdf, :x => sum, ungroup=false) @@ -2405,6 +2431,15 @@ end @test dfc.x_first == [1, 2, 2, 4] @test propertynames(dfc) == [:g, :x, :y, :x_first] + # Test that nthreads argument is accepted + # Correctness tests are run by combine_checked + dfc = copy(df) + gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) + @test select(gdf, :x => sum) ≅ select!(gdf, :x => sum, nthreads=2) + dfc = copy(df) + gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) + @test transform(gdf, :x => sum) ≅ transform!(gdf, :x => sum, nthreads=2) + dfc = copy(df) gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select!(gdf, :x => sum) @@ -2564,43 +2599,67 @@ end @testset "corner cases of group_reduce" begin df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Any[1, 1, 1, 1.5, 1.5, 1.5]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum) == DataFrame(g=1:2, x_sum=[3.0, 4.5]) - - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3.0, 4.5]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.5]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => mean) == DataFrame(g=1:2, x_mean=[1.0, 1.5]) - @test combine(gdf, :x => var) == DataFrame(g=1:2, x_var=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) == DataFrame(g=1:2, x_sum=[3.0, 4.5]) + + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3.0, 4.5]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.5]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => mean) == + DataFrame(g=1:2, x_mean=[1.0, 1.5]) + @test combine_checked(gdf, :x => var) == + DataFrame(g=1:2, x_var=[0.0, 0.0]) df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Any[1, 1, 1, 1, 1, missing]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => sum) ≅ DataFrame(g=1:2, x_sum=[3, missing]) - @test combine(gdf, :x => mean) ≅ DataFrame(g=1:2, x_mean=[1.0, missing]) - @test combine(gdf, :x => var) ≅ DataFrame(g=1:2, x_var=[0.0, missing]) + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) ≅ + DataFrame(g=1:2, x_sum=[3, missing]) + @test combine_checked(gdf, :x => mean) ≅ + DataFrame(g=1:2, x_mean=[1.0, missing]) + @test combine_checked(gdf, :x => var) ≅ + DataFrame(g=1:2, x_var=[0.0, missing]) df = DataFrame(g=[1, 1, 1, 2, 2, 2], x=Union{Real, Missing}[1, 1, 1, 1, 1, missing]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) - @test combine(gdf, :x => mean∘skipmissing) == DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) - @test combine(gdf, :x => var∘skipmissing) == DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) - @test combine(gdf, :x => sum) ≅ DataFrame(g=1:2, x_sum=[3, missing]) - @test combine(gdf, :x => mean) ≅ DataFrame(g=1:2, x_mean=[1.0, missing]) - @test combine(gdf, :x => var) ≅ DataFrame(g=1:2, x_var=[0.0, missing]) + @test combine_checked(gdf, :x => sum∘skipmissing) == + DataFrame(g=1:2, x_sum_skipmissing=[3, 2]) + @test combine_checked(gdf, :x => mean∘skipmissing) == + DataFrame(g=1:2, x_mean_skipmissing=[1.0, 1.0]) + @test combine_checked(gdf, :x => var∘skipmissing) == + DataFrame(g=1:2, x_var_skipmissing=[0.0, 0.0]) + @test combine_checked(gdf, :x => sum) ≅ + DataFrame(g=1:2, x_sum=[3, missing]) + @test combine_checked(gdf, :x => mean) ≅ + DataFrame(g=1:2, x_mean=[1.0, missing]) + @test combine_checked(gdf, :x => var) ≅ + DataFrame(g=1:2, x_var=[0.0, missing]) Random.seed!(1) df = DataFrame(g = rand(1:2, 1000), x1 = rand(Int, 1000)) df.x2 = big.(df.x1) gdf = groupby_checked(df, :g) - res = combine(gdf, :x1 => sum, :x2 => sum, :x1 => x -> sum(x), :x2 => x -> sum(x)) + res = combine_checked(gdf, + :x1 => sum, :x2 => sum, + :x1 => x -> sum(x), + :x2 => x -> sum(x)) @test res.x1_sum == res.x1_function @test res.x2_sum == res.x2_function @test res.x1_sum != res.x2_sum # we are large enough to be sure we differ - res = combine(gdf, :x1 => mean, :x2 => mean, :x1 => x -> mean(x), :x2 => x -> mean(x)) + res = combine_checked(gdf, + :x1 => mean, + :x2 => mean, + :x1 => x -> mean(x), + :x2 => x -> mean(x)) if VERSION >= v"1.5" @test res.x1_mean ≈ res.x1_function else @@ -2612,71 +2671,87 @@ end # make sure we do correct promotions in corner case similar to Base df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=Real[1, 1, big(typemax(Int)), 1, 1, 1]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] == sum(df.x) - @test eltype(combine(gdf, :x => sum)[!, 2]) === BigInt + @test combine_checked(gdf, :x => sum)[1, 2] == sum(df.x) + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === BigInt df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=Real[1, 1, typemax(Int), 1, 1, 1]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] == sum(df.x) - @test eltype(combine(gdf, :x => sum)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] == sum(df.x) + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=fill(missing, 6)) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing + res = combine_checked(gdf, :x => sum) + @test res[1, 2] isa Missing + @test eltype(res[!, 2]) === Missing @test_throws MethodError combine(gdf, :x => sum∘skipmissing) df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Union{Real, Missing}}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing - @test combine(gdf, :x => sum∘skipmissing) == DataFrame(g=1, x_sum_skipmissing=0) - @test eltype(combine(gdf, :x => sum∘skipmissing)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] isa Missing + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Missing + @test combine_checked(gdf, :x => sum∘skipmissing) == DataFrame(g=1, x_sum_skipmissing=0) + @test eltype(combine_checked(gdf, :x => sum∘skipmissing)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Union{Int, Missing}}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing - @test combine(gdf, :x => sum∘skipmissing)[1, 2] == 0 - @test eltype(combine(gdf, :x => sum∘skipmissing)[!, 2]) === Int + @test combine_checked(gdf, :x => sum)[1, 2] isa Missing + @test eltype(combine_checked(gdf, :x => sum)[!, 2]) === Missing + @test combine_checked(gdf, :x => sum∘skipmissing)[1, 2] == 0 + @test eltype(combine_checked(gdf, :x => sum∘skipmissing)[!, 2]) === Int df = DataFrame(g=[1, 1, 1, 1, 1, 1], x=convert(Vector{Any}, fill(missing, 6))) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum)[1, 2] isa Missing - @test eltype(combine(gdf, :x => sum)[!, 2]) === Missing + res = combine_checked(gdf, :x => sum) + @test res[1, 2] isa Missing + @test eltype(res[!, 2]) === Missing @test_throws MethodError combine(gdf, :x => sum∘skipmissing) # these questions can go to a final exam in "mastering combine" class df = DataFrame(g=[1, 2, 3], x=["a", "b", "c"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum => :a, :x => prod => :b) == - combine(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + @test combine_checked(gdf, :x => sum => :a, :x => prod => :b) == + combine_checked(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + df = DataFrame(g=[1, 2, 3], x=Any["a", "b", "c"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum => :a, :x => prod => :b) == - combine(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + @test combine_checked(gdf, :x => sum => :a, :x => prod => :b) == + combine_checked(gdf, :x => (x -> sum(x)) => :a, :x => (x -> prod(x)) => :b) + df = DataFrame(g=[1, 1], x=[missing, "a"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing => :a, :x => prod∘skipmissing => :b) == - combine(gdf, :x => (x -> sum(skipmissing(x))) => :a, :x => (x -> prod(skipmissing(x))) => :b) + @test combine_checked(gdf, + :x => sum∘skipmissing => :a, + :x => prod∘skipmissing => :b) == + combine_checked(gdf, + :x => (x -> sum(skipmissing(x))) => :a, + :x => (x -> prod(skipmissing(x))) => :b) + df = DataFrame(g=[1, 1], x=Any[missing, "a"]) gdf = groupby_checked(df, :g) - @test combine(gdf, :x => sum∘skipmissing => :a, :x => prod∘skipmissing => :b) == - combine(gdf, :x => (x -> sum(skipmissing(x))) => :a, :x => (x -> prod(skipmissing(x))) => :b) + @test combine_checked(gdf, + :x => sum∘skipmissing => :a, + :x => prod∘skipmissing => :b) == + combine_checked(gdf, + :x => (x -> sum(skipmissing(x))) => :a, + :x => (x -> prod(skipmissing(x))) => :b) df = DataFrame(g=[1, 2], x=Any[nothing, "a"]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=[nothing, "a"], b=[nothing, "a"]) @test eltype(df2.a) === eltype(df2.b) === Union{Nothing, String} + df = DataFrame(g=[1, 2], x=Any[1, 1.0]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=ones(2), b=ones(2)) @test eltype(df2.a) === eltype(df2.b) === Float64 + df = DataFrame(g=[1, 2], x=[1, "1"]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 == DataFrame(g=[1, 2], a=[1, "1"], b=[1, "1"]) @test eltype(df2.a) === eltype(df2.b) === Any + df = DataFrame(g=[1, 1, 2], x=[UInt8(1), UInt8(1), missing]) gdf = groupby_checked(df, :g) - df2 = combine(gdf, :x => sum => :a, :x => prod => :b) + df2 = combine_checked(gdf, :x => sum => :a, :x => prod => :b) @test df2 ≅ DataFrame(g=[1, 2], a=[2, missing], b=[1, missing]) @test eltype(df2.a) === eltype(df2.b) === Union{UInt, Missing} end From 713d5b874b0274a136d51f0f826e18696dfdea92 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 25 Nov 2020 11:26:06 +0100 Subject: [PATCH 02/10] Fix setting number of threads in CI --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0dbdb484e7..6ca522c869 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,6 @@ jobs: - uses: actions/cache@v1 env: cache-name: cache-artifacts - JULIA_NUM_THREADS: 2 with: path: ~/.julia/artifacts key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} @@ -45,6 +44,8 @@ jobs: ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: 2 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v1 with: From 3b5addb0059d28797a5d2ae9940fb183fb7595e0 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 25 Nov 2020 17:51:03 +0100 Subject: [PATCH 03/10] Use Int --- src/abstractdataframe/selection.jl | 18 +++++++-------- src/groupeddataframe/fastaggregates.jl | 16 ++++++------- src/groupeddataframe/splitapplycombine.jl | 28 +++++++++++------------ src/other/utils.jl | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 41cb38bc84..3cefa8c3b3 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -644,10 +644,10 @@ end select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) select(args::Callable, df::DataFrame; renamecols::Bool=true) select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Integer=1) + renamecols::Bool=true, nthreads::Int=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -665,7 +665,7 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 +- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 currently has an effect only for some optimized grouped reductions. Values higher than `Threads.nthreads()` will be replaced with that value. @@ -863,10 +863,10 @@ end transform(f::Callable, df::DataFrame; renamecols::Bool=true) transform(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Integer=1) + renamecols::Bool=true, nthreads::Int=1) transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Integer=1) + renamecols::Bool=true, nthreads::Int=1) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -883,7 +883,7 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 +- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 currently has an effect only for some optimized grouped reductions. Values higher than `Threads.nthreads()` will be replaced with that value. @@ -934,10 +934,10 @@ end combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) combine(gd::GroupedDataFrame, args...; keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Integer=1) + renamecols::Bool=true, nthreads::Int=1) combine(f::Base.Callable, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Integer=1) + renamecols::Bool=true, nthreads::Int=1) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -952,7 +952,7 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1 +- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 currently has an effect only for some optimized grouped reductions. Values higher than `Threads.nthreads()` will be replaced with that value. diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 6939f60787..e02aa78ab0 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -157,7 +157,7 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T end function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame, nthreads::Integer) + incol::AbstractVector, gd::GroupedDataFrame, nthreads::Int) n = length(gd) groups = gd.groups if adjust !== nothing || checkempty @@ -279,18 +279,18 @@ end # function barrier works around type instability of groupreduce_init due to applicable groupreduce(f, op, condf, adjust, checkempty::Bool, incol::AbstractVector, gd::GroupedDataFrame, - nthreads::Integer) = + nthreads::Int) = groupreduce!(groupreduce_init(op, condf, adjust, incol, gd), f, op, condf, adjust, checkempty, incol, gd, nthreads) # Avoids the overhead due to Missing when computing reduction groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, incol::AbstractVector, gd::GroupedDataFrame, - nthreads::Integer) = + nthreads::Int) = groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)), f, op, condf, adjust, checkempty, incol, gd, nthreads) (r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Integer=1) = + nthreads::Int=1) = groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var @@ -300,7 +300,7 @@ if VERSION < v"1.1" end function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Integer=1) + nthreads::Int=1) means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd, nthreads) # !ismissing check is purely an optimization to avoid a copy later @@ -316,7 +316,7 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra end function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Integer=1) + nthreads::Int=1) outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) @@ -329,7 +329,7 @@ for f in (:first, :last) # Without using @eval the presence of a keyword argument triggers a Julia bug @eval begin function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Integer=1) + nthreads::Int=1) n = length(gd) outcol = similar(incol, n) fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) @@ -343,7 +343,7 @@ for f in (:first, :last) end function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Integer=1) + nthreads::Int=1) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 41eb912f39..8e1d45f91d 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -22,7 +22,7 @@ function _combine_prepare(gd::GroupedDataFrame, @nospecialize(cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...); keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool, nthreads::Integer) + keeprows::Bool, renamecols::Bool, nthreads::Int) if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end @@ -196,7 +196,7 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, idx_agg::Union{Nothing, AbstractVector{Int}}, - nthreads::Integer) + nthreads::Int) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) @@ -487,7 +487,7 @@ end function _combine(gd::GroupedDataFrame, @nospecialize(cs_norm::Vector{Any}), optional_transform::Vector{Bool}, - copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Integer) + copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int) if isempty(cs_norm) if keeprows && nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * @@ -624,7 +624,7 @@ end function combine(f::Base.Callable, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) + nthreads::Int=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end @@ -634,7 +634,7 @@ end combine(f::Pair, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) = + nthreads::Int=1) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) @@ -642,14 +642,14 @@ combine(f::Pair, gd::GroupedDataFrame; combine(gd::GroupedDataFrame, cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) = + nthreads::Int=1) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, copycols=true, keeprows=false, renamecols=renamecols, nthreads=nthreads) function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) + nthreads::Int=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -659,14 +659,14 @@ end select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) = + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, keeprows=true, renamecols=renamecols, nthreads=nthreads) function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) + nthreads::Int=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -676,7 +676,7 @@ end function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Integer=1) + nthreads::Int=1) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, nthreads=nthreads) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, @@ -686,7 +686,7 @@ function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, end function select!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -694,7 +694,7 @@ function select!(f::Base.Callable, gd::GroupedDataFrame; end function select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) _replace_columns!(df, newdf) @@ -702,7 +702,7 @@ function select!(gd::GroupedDataFrame{DataFrame}, args...; end function transform!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -710,7 +710,7 @@ function transform!(f::Base.Callable, gd::GroupedDataFrame; end function transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) select!(newdf, propertynames(df), :) diff --git a/src/other/utils.jl b/src/other/utils.jl index e13627748c..7bc772254d 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -82,4 +82,4 @@ else using Compat: ComposedFunction end -funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) +funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) \ No newline at end of file From ab76ff80c420f19809a81046e47657e708488eee Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 25 Nov 2020 21:10:29 +0100 Subject: [PATCH 04/10] Disallow negative values for nthreads --- src/groupeddataframe/splitapplycombine.jl | 3 +++ test/grouping.jl | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 8e1d45f91d..ee693ce143 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -26,6 +26,9 @@ function _combine_prepare(gd::GroupedDataFrame, if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end + if nthreads <= 0 + throw(ArgumentError("nthreads must be equal to or greater than 1 (got $nthreads)")) + end cs_vec = [] for p in cs diff --git a/test/grouping.jl b/test/grouping.jl index 423dd3a6e2..07229c8ba6 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3299,4 +3299,12 @@ end end end +@testset "invalid nthreads" begin + gdf = groupby(DataFrame(x=1:10, y=1:10), :y) + @test_throws ArgumentError select(gdf, :x => sum, nthreads=0) + @test_throws ArgumentError transform(gdf, :x => sum, nthreads=0) + @test_throws ArgumentError select!(gdf, :x => sum, nthreads=0) + @test_throws ArgumentError transform!(gdf, :x => sum, nthreads=0) +end + end # module From 3e225ad5fb4d226fa8018737a313c5dffc177f8a Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 25 Nov 2020 21:28:42 +0100 Subject: [PATCH 05/10] Add global default --- docs/src/lib/functions.md | 6 +++ src/abstractdataframe/selection.jl | 62 ++++++++++++++++------- src/groupeddataframe/fastaggregates.jl | 10 ++-- src/groupeddataframe/splitapplycombine.jl | 22 ++++---- src/other/utils.jl | 27 +++++++++- test/utils.jl | 10 ++++ 6 files changed, 101 insertions(+), 36 deletions(-) diff --git a/docs/src/lib/functions.md b/docs/src/lib/functions.md index 128f0ac9e7..432af56cfb 100644 --- a/docs/src/lib/functions.md +++ b/docs/src/lib/functions.md @@ -129,3 +129,9 @@ pairs ```@docs isapprox ``` + +## Multithreading +```@docs +DataFrames.nthreads +DataFrames.nthreads! +``` \ No newline at end of file diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 3cefa8c3b3..58349096cf 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -576,8 +576,12 @@ end """ select!(df::DataFrame, args...; renamecols::Bool=true) select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true) - select!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true) - select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) + select!(gd::GroupedDataFrame{DataFrame}, args...; + ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=DataFrames.nthreads()) + select!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=DataFrames.nthreads()) Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df` or @@ -595,6 +599,11 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. + Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with + a different value. Passing a value higher than 1 currently has an effect only + for some optimized grouped reductions. Values higher than `Threads.nthreads()` + will be replaced with that value. See [`select`](@ref) for examples. ``` @@ -613,8 +622,12 @@ end """ transform!(df::DataFrame, args...; renamecols::Bool=true) transform!(args::Callable, df::DataFrame; renamecols::Bool=true) - transform!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true) - transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) + transform!(gd::GroupedDataFrame{DataFrame}, args...; + ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=DataFrames.nthreads()) + transform!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, + nthreads::Int=DataFrames.nthreads()) Mutate `df` or `gd` in place to add columns specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df`. @@ -627,6 +640,11 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. + Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with + a different value. Passing a value higher than 1 currently has an effect only + for some optimized grouped reductions. Values higher than `Threads.nthreads()` + will be replaced with that value. See [`select`](@ref) for examples. """ @@ -644,10 +662,10 @@ end select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) select(args::Callable, df::DataFrame; renamecols::Bool=true) select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=1) + renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -665,9 +683,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 - currently has an effect only for some optimized grouped reductions. Values higher than - `Threads.nthreads()` will be replaced with that value. +- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. + Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with + a different value. Passing a value higher than 1 currently has an effect only + for some optimized grouped reductions. Values higher than `Threads.nthreads()` + will be replaced with that value. # Examples ```jldoctest @@ -863,10 +883,10 @@ end transform(f::Callable, df::DataFrame; renamecols::Bool=true) transform(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=1) + renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=1) + renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -883,9 +903,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 - currently has an effect only for some optimized grouped reductions. Values higher than - `Threads.nthreads()` will be replaced with that value. +- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. + Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with + a different value. Passing a value higher than 1 currently has an effect only + for some optimized grouped reductions. Values higher than `Threads.nthreads()` + will be replaced with that value. Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false` is needed to be able to return a different value for the grouping column: @@ -934,10 +956,10 @@ end combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) combine(gd::GroupedDataFrame, args...; keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=1) + renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) combine(f::Base.Callable, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=1) + renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -952,9 +974,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1 - currently has an effect only for some optimized grouped reductions. Values higher than - `Threads.nthreads()` will be replaced with that value. +- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. + Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with + a different value. Passing a value higher than 1 currently has an effect only + for some optimized grouped reductions. Values higher than `Threads.nthreads()` + will be replaced with that value. # Examples ```jldoctest diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index e02aa78ab0..3ef186f5dc 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -290,7 +290,7 @@ groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, f, op, condf, adjust, checkempty, incol, gd, nthreads) (r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=1) = + nthreads::Int=nthreads()) = groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var @@ -300,7 +300,7 @@ if VERSION < v"1.1" end function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=1) + nthreads::Int=nthreads()) means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd, nthreads) # !ismissing check is purely an optimization to avoid a copy later @@ -316,7 +316,7 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra end function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=1) + nthreads::Int=nthreads()) outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) @@ -329,7 +329,7 @@ for f in (:first, :last) # Without using @eval the presence of a keyword argument triggers a Julia bug @eval begin function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=1) + nthreads::Int=nthreads()) n = length(gd) outcol = similar(incol, n) fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) @@ -343,7 +343,7 @@ for f in (:first, :last) end function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=1) + nthreads::Int=nthreads()) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index ee693ce143..5fee26ab87 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -627,7 +627,7 @@ end function combine(f::Base.Callable, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) + nthreads::Int=nthreads()) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end @@ -637,7 +637,7 @@ end combine(f::Pair, gd::GroupedDataFrame; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) = + nthreads::Int=nthreads()) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) @@ -645,14 +645,14 @@ combine(f::Pair, gd::GroupedDataFrame; combine(gd::GroupedDataFrame, cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...; keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) = + nthreads::Int=nthreads()) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, copycols=true, keeprows=false, renamecols=renamecols, nthreads=nthreads) function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) + nthreads::Int=nthreads()) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -662,14 +662,14 @@ end select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) = + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, keeprows=true, renamecols=renamecols, nthreads=nthreads) function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) + nthreads::Int=nthreads()) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -679,7 +679,7 @@ end function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=1) + nthreads::Int=nthreads()) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, nthreads=nthreads) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, @@ -689,7 +689,7 @@ function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, end function select!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -697,7 +697,7 @@ function select!(f::Base.Callable, gd::GroupedDataFrame; end function select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) _replace_columns!(df, newdf) @@ -705,7 +705,7 @@ function select!(gd::GroupedDataFrame{DataFrame}, args...; end function transform!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -713,7 +713,7 @@ function transform!(f::Base.Callable, gd::GroupedDataFrame; end function transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) + ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) df = parent(gd) select!(newdf, propertynames(df), :) diff --git a/src/other/utils.jl b/src/other/utils.jl index 7bc772254d..e2148a4a4b 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -82,4 +82,29 @@ else using Compat: ComposedFunction end -funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) \ No newline at end of file +funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) + +const NTHREADS = Ref(1) + +""" + DataFrames.nthreads() + +Return the default value for the `nthreads` argument, which determines the number +of CPU threads used by functions that support it when not specified explicitly. + +Defaults to 1. Call [`DataFrames.nthreads!`](@ref) to adjust the value. +""" +nthreads() = DataFrames.NTHREADS[] + +""" + DataFrames.nthreads!(n::Int) + +Set to `n` the default value for the `nthreads` argument, which determines the number +of CPU threads used by functions that support it when not specified explicitly. + +Use [`DataFrames.nthreads`](@ref) to access the value. +""" +function nthreads!(n::Int) + n > 0 || throw(ArgumentError("n must be equal to or greater than 1 (got $n)")) + DataFrames.NTHREADS[] = n +end \ No newline at end of file diff --git a/test/utils.jl b/test/utils.jl index ba1f507bef..8b30bf591b 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -94,4 +94,14 @@ end :sum_skipmissing_div12 end +@test "nthreads and nthreads!" begin + @test DataFrames.nthreads() == 1 + @test DataFrames.nthreads!(2) == 2 + @test DataFrames.nthreads() == 2 + @test DataFrames.nthreads!(1) == 1 # reset to default + @test_throws ArgumentError DataFrames.nthreads!(0) + @test DataFrames.nthreads() == 1 +end +end + end # module From 9595755e6fcf6a6d12b3d39ec88a36f068654512 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 25 Nov 2020 22:40:59 +0100 Subject: [PATCH 06/10] Fix --- test/utils.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/utils.jl b/test/utils.jl index 8b30bf591b..23baa122ca 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -94,7 +94,7 @@ end :sum_skipmissing_div12 end -@test "nthreads and nthreads!" begin +@testset "nthreads and nthreads!" begin @test DataFrames.nthreads() == 1 @test DataFrames.nthreads!(2) == 2 @test DataFrames.nthreads() == 2 @@ -102,6 +102,5 @@ end @test_throws ArgumentError DataFrames.nthreads!(0) @test DataFrames.nthreads() == 1 end -end end # module From 8000e2efc4d810dd3c8d807a24d90f31c6dd789e Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 28 Nov 2020 10:56:40 +0100 Subject: [PATCH 07/10] Keep API minimal --- NEWS.md | 8 ++- docs/src/lib/functions.md | 6 -- src/DataFrames.jl | 2 + src/abstractdataframe/selection.jl | 58 +++--------------- src/groupeddataframe/fastaggregates.jl | 59 ++++++++---------- src/groupeddataframe/splitapplycombine.jl | 75 +++++++++-------------- src/other/utils.jl | 25 -------- test/grouping.jl | 16 +++-- test/utils.jl | 9 --- 9 files changed, 79 insertions(+), 179 deletions(-) diff --git a/NEWS.md b/NEWS.md index 8580fcea0a..cfa4727476 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,9 +2,11 @@ ## New functionalities -* `combine`, `select` and `transform` with `GroupedDataFrame` now accept - a `nthreads` argument which enables multithreading for some optimized - grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)). +* `combine`, `select` and `transform` with `GroupedDataFrame` now have + support multithreading for some optimized grouped reductions. + This can be enabled using an experimental global option via + `DataFrames.NTHREADS[] = n` (with n > 1) + ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)). # DataFrames v0.22 Release Notes diff --git a/docs/src/lib/functions.md b/docs/src/lib/functions.md index 432af56cfb..128f0ac9e7 100644 --- a/docs/src/lib/functions.md +++ b/docs/src/lib/functions.md @@ -129,9 +129,3 @@ pairs ```@docs isapprox ``` - -## Multithreading -```@docs -DataFrames.nthreads -DataFrames.nthreads! -``` \ No newline at end of file diff --git a/src/DataFrames.jl b/src/DataFrames.jl index e596b4aa03..685f9a4530 100644 --- a/src/DataFrames.jl +++ b/src/DataFrames.jl @@ -91,6 +91,8 @@ else export only end +const NTHREADS = Ref(1) + include("other/utils.jl") include("other/index.jl") diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 58349096cf..df58ac2cea 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -576,12 +576,8 @@ end """ select!(df::DataFrame, args...; renamecols::Bool=true) select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true) - select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=DataFrames.nthreads()) - select!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=DataFrames.nthreads()) + select!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true) + select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df` or @@ -599,11 +595,6 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. - Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with - a different value. Passing a value higher than 1 currently has an effect only - for some optimized grouped reductions. Values higher than `Threads.nthreads()` - will be replaced with that value. See [`select`](@ref) for examples. ``` @@ -622,12 +613,8 @@ end """ transform!(df::DataFrame, args...; renamecols::Bool=true) transform!(args::Callable, df::DataFrame; renamecols::Bool=true) - transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=DataFrames.nthreads()) - transform!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=DataFrames.nthreads()) + transform!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true) + transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) Mutate `df` or `gd` in place to add columns specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df`. @@ -640,11 +627,6 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. - Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with - a different value. Passing a value higher than 1 currently has an effect only - for some optimized grouped reductions. Values higher than `Threads.nthreads()` - will be replaced with that value. See [`select`](@ref) for examples. """ @@ -662,10 +644,9 @@ end select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) select(args::Callable, df::DataFrame; renamecols::Bool=true) select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + ungroup::Bool=true, renamecols::Bool=true) select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -683,11 +664,6 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. - Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with - a different value. Passing a value higher than 1 currently has an effect only - for some optimized grouped reductions. Values higher than `Threads.nthreads()` - will be replaced with that value. # Examples ```jldoctest @@ -882,11 +858,9 @@ end transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) transform(f::Callable, df::DataFrame; renamecols::Bool=true) transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -903,11 +877,6 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. - Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with - a different value. Passing a value higher than 1 currently has an effect only - for some optimized grouped reductions. Values higher than `Threads.nthreads()` - will be replaced with that value. Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false` is needed to be able to return a different value for the grouping column: @@ -955,11 +924,9 @@ end combine(df::AbstractDataFrame, args...; renamecols::Bool=true) combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) combine(gd::GroupedDataFrame, args...; - keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, - renamecols::Bool=true, nthreads::Int=DataFrames.nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -974,11 +941,6 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. -- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use. - Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with - a different value. Passing a value higher than 1 currently has an effect only - for some optimized grouped reductions. Values higher than `Threads.nthreads()` - will be replaced with that value. # Examples ```jldoctest diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 3ef186f5dc..14fa500fe8 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -157,13 +157,13 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T end function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame, nthreads::Int) + incol::AbstractVector, gd::GroupedDataFrame) n = length(gd) - groups = gd.groups if adjust !== nothing || checkempty counts = zeros(Int, n) end - nt = min(nthreads, Threads.nthreads()) + groups = gd.groups + nt = min(NTHREADS[], Threads.nthreads()) if nt <= 1 || axes(incol) != axes(groups) @inbounds for i in eachindex(incol, groups) gix = groups[i] @@ -278,20 +278,17 @@ end # function barrier works around type instability of groupreduce_init due to applicable groupreduce(f, op, condf, adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame, - nthreads::Int) = + incol::AbstractVector, gd::GroupedDataFrame) = groupreduce!(groupreduce_init(op, condf, adjust, incol, gd), - f, op, condf, adjust, checkempty, incol, gd, nthreads) + f, op, condf, adjust, checkempty, incol, gd) # Avoids the overhead due to Missing when computing reduction groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool, - incol::AbstractVector, gd::GroupedDataFrame, - nthreads::Int) = + incol::AbstractVector, gd::GroupedDataFrame) = groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)), - f, op, condf, adjust, checkempty, incol, gd, nthreads) + f, op, condf, adjust, checkempty, incol, gd) -(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=nthreads()) = - groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads) +(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) = + groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd) # this definition is missing in Julia 1.0 LTS and is required by aggregation for var # TODO: remove this when we drop 1.0 support @@ -299,10 +296,8 @@ if VERSION < v"1.1" Base.zero(::Type{Missing}) = missing end -function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=nthreads()) - means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, - incol, gd, nthreads) +function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame) + means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd) # !ismissing check is purely an optimization to avoid a copy later if eltype(means) >: Missing && agg.condf !== !ismissing T = Union{Missing, real(eltype(means))} @@ -312,12 +307,11 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra res = zeros(T, length(gd)) return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf, (x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1), - false, incol, gd, nthreads) + false, incol, gd) end -function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=nthreads()) - outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads) +function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame) + outcol = Aggregate(var, agg.condf)(incol, gd) if eltype(outcol) <: Union{Missing, Rational} return sqrt.(outcol) else @@ -325,25 +319,20 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra end end -for f in (:first, :last) - # Without using @eval the presence of a keyword argument triggers a Julia bug - @eval begin - function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=nthreads()) - n = length(gd) - outcol = similar(incol, n) - fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) - if isconcretetype(eltype(outcol)) - return outcol - else - return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) - end +for f in (first, last) + function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame) + n = length(gd) + outcol = similar(incol, n) + fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last) + if isconcretetype(eltype(outcol)) + return outcol + else + return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol) end end end -function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame; - nthreads::Int=nthreads()) +function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame) if getfield(gd, :idx) === nothing lens = zeros(Int, length(gd)) @inbounds for gix in gd.groups diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 5fee26ab87..5bbe77496a 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -22,13 +22,10 @@ function _combine_prepare(gd::GroupedDataFrame, @nospecialize(cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...); keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool, nthreads::Int) + keeprows::Bool, renamecols::Bool) if !ungroup && !keepkeys throw(ArgumentError("keepkeys=false when ungroup=false is not allowed")) end - if nthreads <= 0 - throw(ArgumentError("nthreads must be equal to or greater than 1 (got $nthreads)")) - end cs_vec = [] for p in cs @@ -66,8 +63,7 @@ function _combine_prepare(gd::GroupedDataFrame, # if optional_transform[i] is true then the transformation will be skipped # if earlier column with a column with the same name was created - idx, valscat = _combine(gd, cs_norm, optional_transform, - copycols, keeprows, renamecols, nthreads) + idx, valscat = _combine(gd, cs_norm, optional_transform, copycols, keeprows, renamecols) !keepkeys && ungroup && return valscat @@ -198,14 +194,13 @@ function _combine_process_agg(@nospecialize(cs_i::Pair{Int, <:Pair{<:Function, S gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Union{Nothing, AbstractVector{Int}}, - nthreads::Int) + idx_agg::Union{Nothing, AbstractVector{Int}}) @assert isagg(cs_i, gd) @assert !optional_i out_col_name = last(last(cs_i)) incol = parentdf[!, first(cs_i)] agg = check_aggregate(first(last(cs_i)), incol) - outcol = agg(incol, gd; nthreads=nthreads) + outcol = agg(incol, gd) if haskey(seen_cols, out_col_name) optional, loc = seen_cols[out_col_name] @@ -489,8 +484,8 @@ function prepare_idx_keeprows(idx::AbstractVector{<:Integer}, end function _combine(gd::GroupedDataFrame, - @nospecialize(cs_norm::Vector{Any}), optional_transform::Vector{Bool}, - copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int) + cs_norm::Vector{Any}, optional_transform::Vector{Bool}, + copycols::Bool, keeprows::Bool, renamecols::Bool) if isempty(cs_norm) if keeprows && nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * @@ -534,8 +529,7 @@ function _combine(gd::GroupedDataFrame, optional_i = optional_transform[i] if length(gd) > 0 && isagg(cs_i, gd) - _combine_process_agg(cs_i, optional_i, parentdf, gd, - seen_cols, trans_res, idx_agg, nthreads) + _combine_process_agg(cs_i, optional_i, parentdf, gd, seen_cols, trans_res, idx_agg) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && !(first(cs_i) isa AsTable) && (last(last(cs_i)) isa Symbol) # this is a fast path used when we pass a column or rename a column in select or transform @@ -626,95 +620,82 @@ function _combine(gd::GroupedDataFrame, end function combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end - return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, - nthreads=nthreads) + return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols) end combine(f::Pair, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) combine(gd::GroupedDataFrame, cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = _combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols, - nthreads=nthreads) + copycols=true, keeprows=false, renamecols=renamecols) function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, - nthreads=nthreads) + return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) end select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) = + ungroup::Bool=true, renamecols::Bool=true) = _combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, keeprows=true, renamecols=renamecols, - nthreads=nthreads) + ungroup=ungroup, keeprows=true, renamecols=renamecols) function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, - nthreads=nthreads) + return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) end function transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, - nthreads::Int=nthreads()) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, renamecols=renamecols, nthreads=nthreads) + ungroup=ungroup, renamecols=renamecols) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, # so parent always gives a data frame select!(parent(res), propertynames(parent(gd)), :) return res end -function select!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) +function select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select!(gd, f, ungroup=ungroup, nthreads=nthreads) + return select!(gd, f, ungroup=ungroup) end function select!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) - newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) + ungroup::Bool=true, renamecols::Bool=true) + newdf = select(gd, args..., copycols=false, renamecols=renamecols) df = parent(gd) _replace_columns!(df, newdf) return ungroup ? df : gd end -function transform!(f::Base.Callable, gd::GroupedDataFrame; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) +function transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform!(gd, f, ungroup=ungroup, nthreads=nthreads) + return transform!(gd, f, ungroup=ungroup) end function transform!(gd::GroupedDataFrame{DataFrame}, args...; - ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) - newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads) + ungroup::Bool=true, renamecols::Bool=true) + newdf = select(gd, :, args..., copycols=false, renamecols=renamecols) df = parent(gd) select!(newdf, propertynames(df), :) _replace_columns!(df, newdf) diff --git a/src/other/utils.jl b/src/other/utils.jl index e2148a4a4b..e13627748c 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -83,28 +83,3 @@ else end funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) - -const NTHREADS = Ref(1) - -""" - DataFrames.nthreads() - -Return the default value for the `nthreads` argument, which determines the number -of CPU threads used by functions that support it when not specified explicitly. - -Defaults to 1. Call [`DataFrames.nthreads!`](@ref) to adjust the value. -""" -nthreads() = DataFrames.NTHREADS[] - -""" - DataFrames.nthreads!(n::Int) - -Set to `n` the default value for the `nthreads` argument, which determines the number -of CPU threads used by functions that support it when not specified explicitly. - -Use [`DataFrames.nthreads`](@ref) to access the value. -""" -function nthreads!(n::Int) - n > 0 || throw(ArgumentError("n must be equal to or greater than 1 (got $n)")) - DataFrames.NTHREADS[] = n -end \ No newline at end of file diff --git a/test/grouping.jl b/test/grouping.jl index 07229c8ba6..03d29ebe14 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -99,8 +99,11 @@ function groupby_checked(df::AbstractDataFrame, keys, args...; kwargs...) end function combine_checked(gd::GroupedDataFrame, args...; kwargs...) - res1 = combine(gd, args...; nthreads=1, kwargs...) - res2 = combine(gd, args...; nthreads=2, kwargs...) + @assert DataFrames.NTHREADS[] == 1 + res1 = combine(gd, args...; kwargs...) + DataFrames.NTHREADS[] = 2 + res2 = combine(gd, args...; kwargs...) + DataFrames.NTHREADS[] = 1 @test names(res1) == names(res2) for (c1, c2) in zip(eachcol(res1), eachcol(res2)) @test typeof(c1) === typeof(c2) @@ -3301,10 +3304,11 @@ end @testset "invalid nthreads" begin gdf = groupby(DataFrame(x=1:10, y=1:10), :y) - @test_throws ArgumentError select(gdf, :x => sum, nthreads=0) - @test_throws ArgumentError transform(gdf, :x => sum, nthreads=0) - @test_throws ArgumentError select!(gdf, :x => sum, nthreads=0) - @test_throws ArgumentError transform!(gdf, :x => sum, nthreads=0) + DataFrames.NTHREADS[] = 0 + @test_throws ArgumentError select(gdf, :x => sum) + @test_throws ArgumentError transform(gdf, :x => sum) + @test_throws ArgumentError select!(gdf, :x => sum) + @test_throws ArgumentError transform!(gdf, :x => sum) end end # module diff --git a/test/utils.jl b/test/utils.jl index 23baa122ca..ba1f507bef 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -94,13 +94,4 @@ end :sum_skipmissing_div12 end -@testset "nthreads and nthreads!" begin - @test DataFrames.nthreads() == 1 - @test DataFrames.nthreads!(2) == 2 - @test DataFrames.nthreads() == 2 - @test DataFrames.nthreads!(1) == 1 # reset to default - @test_throws ArgumentError DataFrames.nthreads!(0) - @test DataFrames.nthreads() == 1 -end - end # module From cc8d2d44c7960a4a7ca0bc87d6e2a6f5d43e0559 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 28 Nov 2020 11:24:21 +0100 Subject: [PATCH 08/10] Print warning when not testing multithreading --- test/grouping.jl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/grouping.jl b/test/grouping.jl index eb727ef632..cb138672db 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -101,9 +101,13 @@ end function combine_checked(gd::GroupedDataFrame, args...; kwargs...) @assert DataFrames.NTHREADS[] == 1 res1 = combine(gd, args...; kwargs...) - DataFrames.NTHREADS[] = 2 - res2 = combine(gd, args...; kwargs...) - DataFrames.NTHREADS[] = 1 + if Base.Threads.nthreads() > 1 + DataFrames.NTHREADS[] = 2 + res2 = combine(gd, args...; kwargs...) + DataFrames.NTHREADS[] = 1 + else + @warn "Only one thread available: multithreading not tested" + end @test names(res1) == names(res2) for (c1, c2) in zip(eachcol(res1), eachcol(res2)) @test typeof(c1) === typeof(c2) From 08aa0d9cf0a767495666f85c88ac71527350376a Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 28 Nov 2020 11:25:13 +0100 Subject: [PATCH 09/10] Use `@spawn` --- src/groupeddataframe/fastaggregates.jl | 40 ++++++++++++++------------ 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 14fa500fe8..bfcd66dc0c 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -194,25 +194,27 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo counts_vec[i] = zeros(Int, n) end end - Threads.@threads for tid in 1:nt - res′ = res_vec[tid] - if adjust !== nothing || checkempty - counts′ = counts_vec[tid] - end - start = 1 + ((tid - 1) * length(groups)) ÷ nt - stop = (tid * length(groups)) ÷ nt - @inbounds for i in start:stop - gix = groups[i] - x = incol[i] - if gix > 0 && (condf === nothing || condf(x)) - # this check should be optimized out if eltype is not Any - if eltype(res′) === Any && !isassigned(res′, gix) - res′[gix] = f(x, gix) - else - res′[gix] = op(res′[gix], f(x, gix)) - end - if adjust !== nothing || checkempty - counts′[gix] += 1 + @sync for tid in 1:nt + Threads.@spawn begin + res′ = res_vec[tid] + if adjust !== nothing || checkempty + counts′ = counts_vec[tid] + end + start = 1 + ((tid - 1) * length(groups)) ÷ nt + stop = (tid * length(groups)) ÷ nt + @inbounds for i in start:stop + gix = groups[i] + x = incol[i] + if gix > 0 && (condf === nothing || condf(x)) + # this check should be optimized out if eltype is not Any + if eltype(res′) === Any && !isassigned(res′, gix) + res′[gix] = f(x, gix) + else + res′[gix] = op(res′[gix], f(x, gix)) + end + if adjust !== nothing || checkempty + counts′[gix] += 1 + end end end end From 2678ccb7f6718497dfa493e15cf82d992e45fcad Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Sat, 28 Nov 2020 12:19:33 +0100 Subject: [PATCH 10/10] Fix tests --- test/grouping.jl | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/test/grouping.jl b/test/grouping.jl index cb138672db..c8868092ef 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -2360,13 +2360,6 @@ end @test parent(gdf2).y ≅ df.y @test parent(gdf2).g === df.g - # Test that nthreads argument is accepted - # Correctness tests are run by combine_checked - @test select(gdf, :x => sum, nthreads=2) ≅ - select(gdf, :x => sum) - @test transform(gdf, :x => sum, nthreads=2) ≅ - transform(gdf, :x => sum) - gdf = groupby_checked(df, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select(gdf, :x => sum) @test_throws ArgumentError select(gdf, :x => sum, ungroup=false) @@ -2444,15 +2437,6 @@ end @test dfc.x_first == [1, 2, 2, 4] @test propertynames(dfc) == [:g, :x, :y, :x_first] - # Test that nthreads argument is accepted - # Correctness tests are run by combine_checked - dfc = copy(df) - gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) - @test select(gdf, :x => sum) ≅ select!(gdf, :x => sum, nthreads=2) - dfc = copy(df) - gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=false) - @test transform(gdf, :x => sum) ≅ transform!(gdf, :x => sum, nthreads=2) - dfc = copy(df) gdf = groupby_checked(dfc, :g, sort=dosort, skipmissing=true) @test_throws ArgumentError select!(gdf, :x => sum) @@ -3354,13 +3338,4 @@ end @test df == df2 end -@testset "invalid nthreads" begin - gdf = groupby(DataFrame(x=1:10, y=1:10), :y) - DataFrames.NTHREADS[] = 0 - @test_throws ArgumentError select(gdf, :x => sum) - @test_throws ArgumentError transform(gdf, :x => sum) - @test_throws ArgumentError select!(gdf, :x => sum) - @test_throws ArgumentError transform!(gdf, :x => sum) -end - end # module