Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
name: CI
on:
pull_request:
branches:
- main
push:
branches:
- main
tags: '*'
branches: [main]
tags: ['*']
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
Expand All @@ -15,31 +12,36 @@ jobs:
fail-fast: false
matrix:
version:
- '1' # Replace this with the minimum Julia version that your package supports. E.g. if your package requires Julia 1.5 or higher, change this to '1.5'.
- '1' # Leave this line unchanged. '1' will automatically expand to the latest stable 1.x release of Julia.
- '1.4'
- '1' # automatically expands to the latest stable 1.x release of Julia
os:
- ubuntu-latest
arch:
- x64
include:
- os: windows-latest
version: '1'
arch: x86
- os: macos-latest
version: '1'
arch: aarch64
- os: ubuntu-latest
version: 'nightly'
arch: x64
allow_failure: true
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
- uses: actions/checkout@v5
- uses: julia-actions/setup-julia@v2
with:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-test-${{ env.cache-name }}-
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/cache@v2
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 4,1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
- uses: codecov/codecov-action@v5
with:
file: lcov.info
files: lcov.info
token: ${{ secrets.CODECOV_TOKEN }}
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "GroupedArrays"
uuid = "6407cd72-fade-4a84-8a1e-56e431fc1533"
authors = ["matthieugomez <gomez.matthieu@gmail.com>"]
version = "0.3.3"
version = "0.3.4"

[deps]
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
Expand All @@ -10,7 +10,7 @@ Missings = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28"
[compat]
DataAPI = "1"
Missings = "1"
julia = "1"
julia = "1.4"

[extras]
CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597"
Expand Down
13 changes: 8 additions & 5 deletions src/GroupedArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,29 @@ function GroupedArray(args...; coalesce = false, sort = true)
s = size(first(args))
all(size(x) == s for x in args) || throw(DimensionMismatch("cannot match array sizes"))
groups = Vector{Int}(undef, prod(s))
ngroups, rhashes, gslots, sorted = row_group_slots(vec.(args), Val(false), groups, !coalesce, sort)
ngroups, rhashes, gslots, sorted = row_group_slots!(vec.(args), Val(false), groups, !coalesce, sort, true)
# sort groups if row_group_slots hasn't already done that
if sort === true && !sorted
idx = find_index(GroupedVector{Int}(groups, ngroups))
group_invperm = invperm(sortperm(collect(zip(map(x -> view(x, idx), args)...))))
@inbounds for (i, gix) in enumerate(groups)
groups[i] = gix > 0 ? group_invperm[gix] : 0
end
@inbounds for i in eachindex(groups)
gix = groups[i]
groups[i] = gix == 0 ? 0 : group_invperm[gix]
end
end
T = !coalesce && any(eltype(x) >: Missing for x in args) ? Union{Int, Missing} : Int
GroupedArray{T, length(s)}(reshape(groups, s), ngroups)
end

# Find index of representative row for each group
# now in fillfirst!
function find_index(g::GroupedArray)
groups, ngroups = g.groups, g.ngroups
idx = Vector{Int}(undef, ngroups)
filled = fill(false, ngroups)
nfilled = 0
@inbounds for (i, gix) in enumerate(groups)
@inbounds for i in 1:length(groups)
gix = groups[i]
if gix > 0 && !filled[gix]
filled[gix] = true
idx[gix] = i
Expand Down
71 changes: 19 additions & 52 deletions src/spawn.jl
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
# This code is taken from DataFrames.jl/src/other/utils.jl

if VERSION >= v"1.3"
using Base.Threads: @spawn
else
# This is the definition of @async in Base
macro spawn(expr)
thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
end
schedule(task)
end
end
end

using Base.Threads: @spawn

# Compute chunks of indices, each with at least `basesize` entries
# This method ensures balanced sizes by avoiding a small last chunk
Expand All @@ -36,51 +20,34 @@ function split_to_chunks(len::Integer, np::Integer)
return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np)
end

if VERSION >= v"1.4"
function _spawn_for_chunks_helper(iter, lbody, basesize)
lidx = iter.args[1]
range = iter.args[2]
quote
let x = $(esc(range)), basesize = $(esc(basesize))
@assert firstindex(x) == 1
function _spawn_for_chunks_helper(iter, lbody, basesize)
lidx = iter.args[1]
range = iter.args[2]
quote
let x = $(esc(range)), basesize = $(esc(basesize))
@assert firstindex(x) == 1

nt = Threads.nthreads()
len = length(x)
if nt > 1 && len > basesize
tasks = [Threads.@spawn begin
for i in p
local $(esc(lidx)) = @inbounds x[i]
$(esc(lbody))
end
nt = Threads.nthreads()
len = length(x)
if nt > 1 && len > basesize
tasks = [@spawn begin
for i in p
local $(esc(lidx)) = @inbounds x[i]
$(esc(lbody))
end
for p in split_indices(len, basesize)]
foreach(wait, tasks)
else
for i in eachindex(x)
local $(esc(lidx)) = @inbounds x[i]
$(esc(lbody))
end
end
end
nothing
end
end
else
function _spawn_for_chunks_helper(iter, lbody, basesize)
lidx = iter.args[1]
range = iter.args[2]
quote
let x = $(esc(range))
end
for p in split_indices(len, basesize)]
foreach(wait, tasks)
else
for i in eachindex(x)
local $(esc(lidx)) = @inbounds x[i]
$(esc(lbody))
end
end
nothing
end
nothing
end
end

"""
@spawn_for_chunks basesize for i in range ... end
Parallelize a `for` loop by spawning separate tasks
Expand Down
Loading
Loading