Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-julia-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
- '.github/workflows/TagBot.yml'
jobs:
test-julia-nightly:
timeout-minutes: 10
timeout-minutes: 30
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
- '.github/workflows/TagBot.yml'
jobs:
test:
timeout-minutes: 10
timeout-minutes: 30
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
Expand Down
5 changes: 3 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ThreadingUtilities"
uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5"
authors = ["Chris Elrod <elrodc@gmail.com> and contributors"]
version = "0.4.7"
version = "0.5.0"

[deps]
ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667"
Expand All @@ -13,9 +13,10 @@ julia = "1.5"

[extras]
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Aqua", "InteractiveUtils", "StaticArrays", "Test"]
test = ["Aqua", "BenchmarkTools", "InteractiveUtils", "StaticArrays", "Test"]
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@
Utilities for low overhead threading in Julia.

Please see the [documentation](https://JuliaSIMD.github.io/ThreadingUtilities.jl/stable).

If you're using Windows, please note that Windows often allocates memory when neither Mac or Linux do. I do not know why. If you can help diagnose/fix the problem, please take a look at `count_allocated()` in `/test/staticarrays.jl`.

15 changes: 2 additions & 13 deletions src/ThreadingUtilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,16 @@ function initialize_task(tid)
return nothing
end

if Sys.WORD_SIZE == 32
retnothing(::Ptr{UInt}) = nothing
end
function __init__()
_print_exclusivity_warning()
nt = min(Threads.nthreads(), (Sys.CPU_THREADS)::Int) - 1
sys_threads::Int = parse(Bool, get(ENV, "GITHUB_ACTIONS", "false")) ? Threads.nthreads() : (Sys.CPU_THREADS)::Int
nt = min(Threads.nthreads(), sys_threads) - 1
resize!(THREADPOOL, (THREADBUFFERSIZE ÷ sizeof(UInt)) * nt + (LINESPACING ÷ sizeof(UInt)) - 1)
copyto!(THREADPOOL, zero(UInt))
# align to LINESPACING boundary, and then subtract THREADBUFFERSIZE to make the pointer 1-indexed
THREADPOOLPTR[] = reinterpret(Ptr{UInt}, (reinterpret(UInt, pointer(THREADPOOL))+LINESPACING-1) & (-LINESPACING)) - THREADBUFFERSIZE
resize!(TASKS, nt)
foreach(initialize_task, 1:nt)
@static if Sys.WORD_SIZE == 32
if nt > 0
fptr = @cfunction(retnothing, Cvoid, (Ptr{UInt},))
store!(taskpointer(1), fptr, sizeof(UInt))
_atomic_xchg!(taskpointer(1), TASK)
wake_thread!(1)
wait(1)
end
end
end

end # module
22 changes: 20 additions & 2 deletions src/threadtasks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ end
@inline function launch(f::F, tid::Integer, args::Vararg{Any,K}) where {F,K}
p = taskpointer(tid)
f(p, args...)
state = _atomic_xchg!(p, TASK) # exchange must happen atomically, to prevent it from switching to `WAIT` after reading
# exchange must happen atomically, to prevent it from switching to `WAIT` after reading
state = _atomic_xchg!(p, TASK)
state == WAIT && wake_thread!(tid)
return nothing
end
Expand All @@ -26,7 +27,6 @@ function (tt::ThreadTask)()
while true
if _atomic_state(p) == TASK
_call(p)
_atomic_store!(p, SPIN)
wait_counter = zero(UInt32)
continue
end
Expand All @@ -39,6 +39,24 @@ function (tt::ThreadTask)()
end
end

function _sleep(p::Ptr{UInt})
_atomic_store!(p, WAIT)
Base.wait();
return nothing
end

function sleep_all_tasks()
fptr = @cfunction(_sleep, Cvoid, (Ptr{UInt},))
for tid ∈ eachindex(TASKS)
p = taskpointer(tid)
ThreadingUtilities.store!(p, fptr, sizeof(UInt))
_atomic_cas_cmp!(p, SPIN, TASK)
end
for tid ∈ eachindex(TASKS)
wait(tid)
end
end

# 1-based tid, pushes into task 2-nthreads()
@noinline function wake_thread!(_tid::T) where {T <: Integer}
tid = _tid % Int
Expand Down
4 changes: 2 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ include("test-suite-preamble.jl")

include("internals.jl")
include("threadingutilities.jl")
if (!parse(Bool, get(ENV, "GITHUB_ACTIONS", "false"))) && Threads.nthreads() > 3
include("staticarrays.jl")
if Threads.nthreads() > 3
include("staticarrays.jl")
end
include("threadpool.jl")
include("warnings.jl")
Expand Down
26 changes: 21 additions & 5 deletions test/staticarrays.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using StaticArrays, ThreadingUtilities
using StaticArrays, ThreadingUtilities, BenchmarkTools
struct MulStaticArray{P} end
function (::MulStaticArray{P})(p::Ptr{UInt}) where {P}
_, (ptry,ptrx) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
unsafe_store!(ptry, unsafe_load(ptrx) * 2.7)
ThreadingUtilities._atomic_store!(p, ThreadingUtilities.SPIN)
nothing
end
@generated function mul_staticarray_ptr(::A, ::B) where {A,B}
Expand All @@ -24,7 +25,7 @@ end

function waste_time(a, b)
s = a * b'
for i ∈ 1:00
for _ ∈ 1:0
s += a * b'
end
s
Expand All @@ -48,26 +49,41 @@ function mul_svector_threads(a::T, b::T, c::T) where {T}
end
rx[], ry[], rz[], w
end
function count_allocated()
a = @SVector rand(16);
b = @SVector rand(16);
c = @SVector rand(16);
@ballocated(mul_svector_threads($a,$b,$c))
end


@testset "SVector Test" begin
a = @SVector rand(16);
b = @SVector rand(16);
c = @SVector rand(16);
w,x,y,z = mul_svector_threads(a, b, c)
@test @allocated(mul_svector_threads(a, b, c)) == 0
count_allocated()
if !Sys.iswindows()
@test count_allocated() == 0
else
@show count_allocated()
end
@test w == a*2.7
@test x == b*2.7
@test y == c*2.7
@test z ≈ waste_time(a, b)
A = @SMatrix rand(7,9);
B = @SMatrix rand(7,9);
C = @SMatrix rand(7,9);
Wans = A*2.7; Xans = B*2.7; Yans = C*2.7; Zans = waste_time(A, B)
Wans = A*2.7; Xans = B*2.7; Yans = C*2.7;
for i ∈ 1:100 # repeat rapdily
C, A, B = A, B, C
W,X,Y,Z = mul_svector_threads(A, B, C)
iseven(i) && ThreadingUtilities.sleep_all_tasks()
(Yans, Wans, Xans) = Wans, Xans, Yans
@test W == Wans
@test X == Xans
@test Y == Yans
@test Z ≈ Zans
@test Z ≈ waste_time(A, B)
end
end
1 change: 1 addition & 0 deletions test/threadingutilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ function (::Copy{P})(p::Ptr{UInt}) where {P}
@simd ivdep for n ∈ 1:N
unsafe_store!(ptry, unsafe_load(ptrx, n), n)
end
ThreadingUtilities._atomic_store!(p, ThreadingUtilities.SPIN)
end
@generated function copy_ptr(::A, ::B) where {A,B}
c = Copy{Tuple{A,B,Int}}()
Expand Down
9 changes: 5 additions & 4 deletions test/threadpool.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@testset "THREADPOOL" begin
@test isconst(ThreadingUtilities, :THREADPOOL) # test that ThreadingUtilities.THREADPOOL is a constant
@test ThreadingUtilities.THREADPOOL isa Vector{UInt}
@test eltype(ThreadingUtilities.THREADPOOL) === UInt
@test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),(Sys.CPU_THREADS)::Int) - 1) + (256 ÷ sizeof(UInt)) - 1
@test isconst(ThreadingUtilities, :THREADPOOL) # test that ThreadingUtilities.THREADPOOL is a constant
@test ThreadingUtilities.THREADPOOL isa Vector{UInt}
@test eltype(ThreadingUtilities.THREADPOOL) === UInt
sys_threads::Int = parse(Bool, get(ENV, "GITHUB_ACTIONS", "false")) ? Threads.nthreads() : (Sys.CPU_THREADS)::Int
@test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),sys_threads) - 1) + (256 ÷ sizeof(UInt)) - 1
end