From d4816e0d487e1ff491c15b0aed048eb63ae41428 Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Mon, 20 Oct 2025 14:29:59 -0400 Subject: [PATCH 1/2] update code --- .github/workflows/ci.yml | 44 +++++++------ Project.toml | 4 +- src/GroupedArrays.jl | 13 ++-- src/spawn.jl | 71 ++++++-------------- src/utils.jl | 135 ++++++++++++++++++++++++++++----------- 5 files changed, 150 insertions(+), 117 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2de2c54..5b35e53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,12 +1,9 @@ name: CI on: pull_request: - branches: - - main push: - branches: - - main - tags: '*' + branches: [main] + tags: ['*'] jobs: test: name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} @@ -15,31 +12,36 @@ jobs: fail-fast: false matrix: version: - - '1' # Replace this with the minimum Julia version that your package supports. E.g. if your package requires Julia 1.5 or higher, change this to '1.5'. - - '1' # Leave this line unchanged. '1' will automatically expand to the latest stable 1.x release of Julia. + - '1.4' + - '1' # automatically expands to the latest stable 1.x release of Julia os: - ubuntu-latest arch: - x64 + include: + - os: windows-latest + version: '1' + arch: x86 + - os: macos-latest + version: '1' + arch: aarch64 + - os: ubuntu-latest + version: 'nightly' + arch: x64 + allow_failure: true steps: - - uses: actions/checkout@v2 - - uses: julia-actions/setup-julia@v1 + - uses: actions/checkout@v5 + - uses: julia-actions/setup-julia@v2 with: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - - uses: actions/cache@v1 - env: - cache-name: cache-artifacts - with: - path: ~/.julia/artifacts - key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} - restore-keys: | - ${{ runner.os }}-test-${{ env.cache-name }}- - ${{ runner.os }}-test- - ${{ runner.os }}- + - uses: julia-actions/cache@v2 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: 4,1 - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v1 + - uses: codecov/codecov-action@v5 with: - file: lcov.info + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file diff --git a/Project.toml b/Project.toml index 5514099..3fe4bd6 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "GroupedArrays" uuid = "6407cd72-fade-4a84-8a1e-56e431fc1533" authors = ["matthieugomez "] -version = "0.3.3" +version = "0.3.4" [deps] DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" @@ -10,7 +10,7 @@ Missings = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28" [compat] DataAPI = "1" Missings = "1" -julia = "1" +julia = "1.4" [extras] CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597" diff --git a/src/GroupedArrays.jl b/src/GroupedArrays.jl index 7daaa9f..2b0776c 100644 --- a/src/GroupedArrays.jl +++ b/src/GroupedArrays.jl @@ -88,26 +88,29 @@ function GroupedArray(args...; coalesce = false, sort = true) s = size(first(args)) all(size(x) == s for x in args) || throw(DimensionMismatch("cannot match array sizes")) groups = Vector{Int}(undef, prod(s)) - ngroups, rhashes, gslots, sorted = row_group_slots(vec.(args), Val(false), groups, !coalesce, sort) + ngroups, rhashes, gslots, sorted = row_group_slots!(vec.(args), Val(false), groups, !coalesce, sort, true) # sort groups if row_group_slots hasn't already done that if sort === true && !sorted idx = find_index(GroupedVector{Int}(groups, ngroups)) group_invperm = invperm(sortperm(collect(zip(map(x -> view(x, idx), args)...)))) - @inbounds for (i, gix) in enumerate(groups) - groups[i] = gix > 0 ? group_invperm[gix] : 0 - end + @inbounds for i in eachindex(groups) + gix = groups[i] + groups[i] = gix == 0 ? 0 : group_invperm[gix] + end end T = !coalesce && any(eltype(x) >: Missing for x in args) ? Union{Int, Missing} : Int GroupedArray{T, length(s)}(reshape(groups, s), ngroups) end # Find index of representative row for each group +# now in fillfirst! function find_index(g::GroupedArray) groups, ngroups = g.groups, g.ngroups idx = Vector{Int}(undef, ngroups) filled = fill(false, ngroups) nfilled = 0 - @inbounds for (i, gix) in enumerate(groups) + @inbounds for i in 1:length(groups) + gix = groups[i] if gix > 0 && !filled[gix] filled[gix] = true idx[gix] = i diff --git a/src/spawn.jl b/src/spawn.jl index 8a27e10..f6a11df 100644 --- a/src/spawn.jl +++ b/src/spawn.jl @@ -1,22 +1,6 @@ # This code is taken from DataFrames.jl/src/other/utils.jl -if VERSION >= v"1.3" - using Base.Threads: @spawn -else - # This is the definition of @async in Base - macro spawn(expr) - thunk = esc(:(()->($expr))) - var = esc(Base.sync_varname) - quote - local task = Task($thunk) - if $(Expr(:isdefined, var)) - push!($var, task) - end - schedule(task) - end - end -end - +using Base.Threads: @spawn # Compute chunks of indices, each with at least `basesize` entries # This method ensures balanced sizes by avoiding a small last chunk @@ -36,51 +20,34 @@ function split_to_chunks(len::Integer, np::Integer) return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np) end -if VERSION >= v"1.4" - function _spawn_for_chunks_helper(iter, lbody, basesize) - lidx = iter.args[1] - range = iter.args[2] - quote - let x = $(esc(range)), basesize = $(esc(basesize)) - @assert firstindex(x) == 1 +function _spawn_for_chunks_helper(iter, lbody, basesize) + lidx = iter.args[1] + range = iter.args[2] + quote + let x = $(esc(range)), basesize = $(esc(basesize)) + @assert firstindex(x) == 1 - nt = Threads.nthreads() - len = length(x) - if nt > 1 && len > basesize - tasks = [Threads.@spawn begin - for i in p - local $(esc(lidx)) = @inbounds x[i] - $(esc(lbody)) - end + nt = Threads.nthreads() + len = length(x) + if nt > 1 && len > basesize + tasks = [@spawn begin + for i in p + local $(esc(lidx)) = @inbounds x[i] + $(esc(lbody)) end - for p in split_indices(len, basesize)] - foreach(wait, tasks) - else - for i in eachindex(x) - local $(esc(lidx)) = @inbounds x[i] - $(esc(lbody)) - end - end - end - nothing - end - end -else - function _spawn_for_chunks_helper(iter, lbody, basesize) - lidx = iter.args[1] - range = iter.args[2] - quote - let x = $(esc(range)) + end + for p in split_indices(len, basesize)] + foreach(wait, tasks) + else for i in eachindex(x) local $(esc(lidx)) = @inbounds x[i] $(esc(lbody)) end end - nothing end + nothing end end - """ @spawn_for_chunks basesize for i in range ... end Parallelize a `for` loop by spawning separate tasks diff --git a/src/utils.jl b/src/utils.jl index c553ef9..074207e 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -8,7 +8,7 @@ function hashrows_col!(h::Vector{UInt}, v::AbstractVector{T}, rp::Nothing, firstcol::Bool) where T - @spawn_for_chunks 1_000_000 for i in eachindex(h) + @spawn_for_chunks 100_000 for i in eachindex(h) @inbounds begin el = v[i] h[i] = hash(el, h[i]) @@ -36,18 +36,18 @@ function hashrows_col!(h::Vector{UInt}, fira = firstindex(ra) hashes = Vector{UInt}(undef, length(rp)) - @spawn_for_chunks 1_000_000 for i in eachindex(hashes) + @spawn_for_chunks 100_000 for i in eachindex(hashes) @inbounds hashes[i] = hash(rp[i+firp-1]) end # here we rely on the fact that `DataAPI.refpool` has a continuous # block of indices - @spawn_for_chunks 1_000_000 for i in eachindex(h) + @spawn_for_chunks 100_000 for i in eachindex(h) @inbounds ref = ra[i+fira-1] @inbounds h[i] = hashes[ref+1-firp] end else - @spawn_for_chunks 1_000_000 for i in eachindex(h, v) + @spawn_for_chunks 100_000 for i in eachindex(h, v) @inbounds h[i] = hash(v[i], h[i]) end end @@ -61,7 +61,8 @@ end # Calculate the vector of `df` rows hash values. function hashrows(cols::Tuple{Vararg{AbstractVector}}, skipmissing::Bool) len = length(cols[1]) - rhashes = zeros(UInt, len) + ref_val = @static Base.VERSION >= v"1.13.0-DEV" ? Base.HASH_SEED : UInt(0) + rhashes = fill(ref_val, len) missings = fill(false, skipmissing ? len : 0) for (i, col) in enumerate(cols) rp = DataAPI.refpool(col) @@ -159,7 +160,7 @@ function refpool_and_array(x::AbstractArray) else minval, maxval = extrema(x) end - ngroups = big(maxval) - big(minval) + 1 + ngroups = BigInt(maxval) - BigInt(minval) + 1 # Threshold chosen with the same rationale as the row_group_slots refpool method: # refpool approach is faster but we should not allocate too much memory either # We also have to avoid overflow, including with ngroups + 1 for missing values @@ -181,14 +182,22 @@ end # 2) vector of row hashes (may be empty if hash=Val(false)) # 3) slot array for a hash map, non-zero values are # the indices of the first row in a group +# (returned only if hashes are generated) # 4) whether groups are already sorted # Optional `groups` vector is set to the group indices of each row (starting at 1) # With skipmissing=true, rows with missing values are attributed index 0. -function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, - hash::Val, - groups::Union{Vector{Int}, Nothing}, - skipmissing::Bool, - sort::Union{Bool, Nothing})::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} +# +# Also the last argument is `compress`. If it is `false` then groups are not +# compressed to form a continuous sequence. Normally `true` should be passed +# as this ensures that returned `ngroups` indeed indicates the number of groups +# but e.g. in `nonunique` we do not use this information so compressing +# can be skipped by passing `compress=false` +function row_group_slots!(cols::Tuple{Vararg{AbstractVector}}, + hash::Val, + groups::Union{Vector{Int}, Nothing}, + skipmissing::Bool, + sort::Union{Bool, Nothing}, + compress::Bool)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} rpa = refpool_and_array.(cols) if sort === false refpools = nothing @@ -197,17 +206,19 @@ function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, refpools = first.(rpa) refarrays = last.(rpa) end - row_group_slots(cols, refpools, refarrays, hash, groups, skipmissing, sort === true) + row_group_slots!(cols, refpools, refarrays, hash, groups, skipmissing, + sort === true, compress) end -# Generic fallback method based on open adressing hash table -function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, - refpools::Any, # Ignored - refarrays::Any, # Ignored - hash::Val, - groups::Union{Vector{Int}, Nothing}, - skipmissing::Bool, - sort::Bool)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} +# Generic fallback method based on open addressing hash table +function row_group_slots!(cols::Tuple{Vararg{AbstractVector}}, + refpools::Any, # Ignored + refarrays::Any, # Ignored + hash::Val, + groups::Union{Vector{Int}, Nothing}, + skipmissing::Bool, + sort::Bool, + compress::Bool)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} @assert groups === nothing || length(groups) == length(cols[1]) rhashes, missings = hashrows(cols, skipmissing) # inspired by Dict code from base cf. https://github.com/JuliaData/DataTables.jl/pull/17#discussion_r102481481 @@ -254,19 +265,21 @@ function row_group_slots(cols::Tuple{Vararg{AbstractVector}}, end # Optimized method for arrays for which DataAPI.refpool is defined and returns an AbstractVector -function row_group_slots(cols::NTuple{N, AbstractVector}, - refpools::NTuple{N, AbstractVector}, - refarrays::NTuple{N, - Union{AbstractVector{<:Real}, - Missings.EachReplaceMissing{ - <:AbstractVector{<:Union{Real, Missing}}}}}, - hash::Val{false}, - groups::Vector{Int}, - skipmissing::Bool, - sort::Bool)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N +function row_group_slots!(cols::NTuple{N, AbstractVector}, + refpools::NTuple{N, AbstractVector}, + refarrays::NTuple{N, + Union{AbstractVector{<:Real}, + Missings.EachReplaceMissing{ + <:AbstractVector{<:Union{Real, Missing}}}}}, + hash::Val{false}, + groups::Vector{Int}, + skipmissing::Bool, + sort::Bool, + compress::Bool)::Tuple{Int, Vector{UInt}, Vector{Int}, Bool} where N # Computing neither hashes nor groups isn't very useful, # and this method needs to allocate a groups vector anyway @assert all(col -> length(col) == length(groups), cols) + missinginds = map(refpools) do refpool eltype(refpool) >: Missing ? something(findfirst(ismissing, refpool), lastindex(refpool)+1) : lastindex(refpool)+1 @@ -298,10 +311,10 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, newcols = (skipmissing && any(refpool -> eltype(refpool) >: Missing, refpools)) || !(refarrays isa NTuple{<:Any, AbstractVector}) || sort ? cols : refarrays - return invoke(row_group_slots, + return invoke(row_group_slots!, Tuple{Tuple{Vararg{AbstractVector}}, Any, Any, Val, - Union{Vector{Int}, Nothing}, Bool, Bool}, - newcols, refpools, refarrays, hash, groups, skipmissing, sort) + Union{Vector{Int}, Nothing}, Bool, Bool, Bool}, + newcols, refpools, refarrays, hash, groups, skipmissing, sort, compress) end strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N, Int} @@ -323,10 +336,17 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, lg = length(groups) nt = Threads.nthreads() - # disable threading if we are processing a small data frame or number of groups is large - if lg < 1_000_000 || ngroups > lg * (0.5 - 1 / (2 * nt)) / (2 * nt) - nt = 1 + # make sure we are processing at least 100_000 rows per task if we do threading + if lg < 100_000 * nt + nt = max(1, lg ÷ 100_000) + end + # if there are few rows per group limit the number of threads used + if ngroups == 0 + nt = 1 + else + nt = clamp(round(Int, (lg / 4) / ngroups - 2), 1, nt) end + seen = fill(false, ngroups) seen_vec = Vector{Vector{Bool}}(undef, nt) seen_vec[1] = seen @@ -430,7 +450,9 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, # If some groups are unused, compress group indices to drop them # sum(seen) is faster than all(seen) when not short-circuiting, # and short-circuit would only happen in the slower case anyway - if sum(seen) < length(seen) + # + # This process is not needed if row_group_slots! is called with compress=false + if compress && sum(seen) < length(seen) oldngroups = ngroups remap = zeros(Int, ngroups) ngroups = 0 @@ -447,3 +469,42 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, end return ngroups, UInt[], Int[], sort end + + +# Return a 3-tuple of a permutation that sorts rows into groups, +# and the positions of the first and last rows in each group in that permutation +# `groups` must contain group indices in 0:ngroups +# Rows with group index 0 are skipped (used when skipmissing=true) +# Partly uses the code of Wes McKinney's groupsort_indexer in pandas (file: src/groupby.pyx). +function compute_indices(groups::AbstractVector{<:Integer}, ngroups::Integer) + # count elements in each group + stops = zeros(Int, ngroups+1) + @inbounds for gix in groups + stops[gix+1] += 1 + end + + # group start positions in a sorted table + starts = Vector{Int}(undef, ngroups+1) + if length(starts) > 0 + starts[1] = 1 + @inbounds for i in 1:ngroups + starts[i+1] = starts[i] + stops[i] + end + end + + # define row permutation that sorts them into groups + rperm = Vector{Int}(undef, length(groups)) + copyto!(stops, starts) + @inbounds for (i, gix) in enumerate(groups) + rperm[stops[gix+1]] = i + stops[gix+1] += 1 + end + stops .-= 1 + + # When skipmissing=true was used, group 0 corresponds to missings to drop + # Otherwise it's empty + popfirst!(starts) + popfirst!(stops) + + return rperm, starts, stops +end From d8ef897bc13d28f47de9a8e32fb1d0057f9afcfe Mon Sep 17 00:00:00 2001 From: Matthieu Gomez Date: Mon, 20 Oct 2025 14:32:25 -0400 Subject: [PATCH 2/2] remove compute_indices --- src/utils.jl | 39 --------------------------------------- 1 file changed, 39 deletions(-) diff --git a/src/utils.jl b/src/utils.jl index 074207e..7cc78b4 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -469,42 +469,3 @@ function row_group_slots!(cols::NTuple{N, AbstractVector}, end return ngroups, UInt[], Int[], sort end - - -# Return a 3-tuple of a permutation that sorts rows into groups, -# and the positions of the first and last rows in each group in that permutation -# `groups` must contain group indices in 0:ngroups -# Rows with group index 0 are skipped (used when skipmissing=true) -# Partly uses the code of Wes McKinney's groupsort_indexer in pandas (file: src/groupby.pyx). -function compute_indices(groups::AbstractVector{<:Integer}, ngroups::Integer) - # count elements in each group - stops = zeros(Int, ngroups+1) - @inbounds for gix in groups - stops[gix+1] += 1 - end - - # group start positions in a sorted table - starts = Vector{Int}(undef, ngroups+1) - if length(starts) > 0 - starts[1] = 1 - @inbounds for i in 1:ngroups - starts[i+1] = starts[i] + stops[i] - end - end - - # define row permutation that sorts them into groups - rperm = Vector{Int}(undef, length(groups)) - copyto!(stops, starts) - @inbounds for (i, gix) in enumerate(groups) - rperm[stops[gix+1]] = i - stops[gix+1] += 1 - end - stops .-= 1 - - # When skipmissing=true was used, group 0 corresponds to missings to drop - # Otherwise it's empty - popfirst!(starts) - popfirst!(stops) - - return rperm, starts, stops -end