diff --git a/.github/workflows/ci-julia-nightly.yml b/.github/workflows/ci-julia-nightly.yml index eb12df8..3eaf9a1 100644 --- a/.github/workflows/ci-julia-nightly.yml +++ b/.github/workflows/ci-julia-nightly.yml @@ -17,7 +17,7 @@ on: - '.github/workflows/TagBot.yml' jobs: test-julia-nightly: - timeout-minutes: 10 + timeout-minutes: 30 name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} runs-on: ${{ matrix.os }} strategy: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 330fba1..e0257b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ on: - '.github/workflows/TagBot.yml' jobs: test: - timeout-minutes: 10 + timeout-minutes: 30 name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} runs-on: ${{ matrix.os }} strategy: diff --git a/Project.toml b/Project.toml index ac2849e..22488e4 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ThreadingUtilities" uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5" authors = ["Chris Elrod and contributors"] -version = "0.4.7" +version = "0.5.0" [deps] ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667" @@ -13,9 +13,10 @@ julia = "1.5" [extras] Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Aqua", "InteractiveUtils", "StaticArrays", "Test"] +test = ["Aqua", "BenchmarkTools", "InteractiveUtils", "StaticArrays", "Test"] diff --git a/README.md b/README.md index c8a3e3a..7338417 100644 --- a/README.md +++ b/README.md @@ -9,3 +9,6 @@ Utilities for low overhead threading in Julia. Please see the [documentation](https://JuliaSIMD.github.io/ThreadingUtilities.jl/stable). + +If you're using Windows, please note that Windows often allocates memory when neither Mac or Linux do. I do not know why. If you can help diagnose/fix the problem, please take a look at `count_allocated()` in `/test/staticarrays.jl`. + diff --git a/src/ThreadingUtilities.jl b/src/ThreadingUtilities.jl index 8fdd567..509bf23 100644 --- a/src/ThreadingUtilities.jl +++ b/src/ThreadingUtilities.jl @@ -40,27 +40,16 @@ function initialize_task(tid) return nothing end -if Sys.WORD_SIZE == 32 - retnothing(::Ptr{UInt}) = nothing -end function __init__() _print_exclusivity_warning() - nt = min(Threads.nthreads(), (Sys.CPU_THREADS)::Int) - 1 + sys_threads::Int = parse(Bool, get(ENV, "GITHUB_ACTIONS", "false")) ? Threads.nthreads() : (Sys.CPU_THREADS)::Int + nt = min(Threads.nthreads(), sys_threads) - 1 resize!(THREADPOOL, (THREADBUFFERSIZE ÷ sizeof(UInt)) * nt + (LINESPACING ÷ sizeof(UInt)) - 1) copyto!(THREADPOOL, zero(UInt)) # align to LINESPACING boundary, and then subtract THREADBUFFERSIZE to make the pointer 1-indexed THREADPOOLPTR[] = reinterpret(Ptr{UInt}, (reinterpret(UInt, pointer(THREADPOOL))+LINESPACING-1) & (-LINESPACING)) - THREADBUFFERSIZE resize!(TASKS, nt) foreach(initialize_task, 1:nt) - @static if Sys.WORD_SIZE == 32 - if nt > 0 - fptr = @cfunction(retnothing, Cvoid, (Ptr{UInt},)) - store!(taskpointer(1), fptr, sizeof(UInt)) - _atomic_xchg!(taskpointer(1), TASK) - wake_thread!(1) - wait(1) - end - end end end # module diff --git a/src/threadtasks.jl b/src/threadtasks.jl index c7bd3ce..5475990 100644 --- a/src/threadtasks.jl +++ b/src/threadtasks.jl @@ -13,7 +13,8 @@ end @inline function launch(f::F, tid::Integer, args::Vararg{Any,K}) where {F,K} p = taskpointer(tid) f(p, args...) - state = _atomic_xchg!(p, TASK) # exchange must happen atomically, to prevent it from switching to `WAIT` after reading + # exchange must happen atomically, to prevent it from switching to `WAIT` after reading + state = _atomic_xchg!(p, TASK) state == WAIT && wake_thread!(tid) return nothing end @@ -26,7 +27,6 @@ function (tt::ThreadTask)() while true if _atomic_state(p) == TASK _call(p) - _atomic_store!(p, SPIN) wait_counter = zero(UInt32) continue end @@ -39,6 +39,24 @@ function (tt::ThreadTask)() end end +function _sleep(p::Ptr{UInt}) + _atomic_store!(p, WAIT) + Base.wait(); + return nothing +end + +function sleep_all_tasks() + fptr = @cfunction(_sleep, Cvoid, (Ptr{UInt},)) + for tid ∈ eachindex(TASKS) + p = taskpointer(tid) + ThreadingUtilities.store!(p, fptr, sizeof(UInt)) + _atomic_cas_cmp!(p, SPIN, TASK) + end + for tid ∈ eachindex(TASKS) + wait(tid) + end +end + # 1-based tid, pushes into task 2-nthreads() @noinline function wake_thread!(_tid::T) where {T <: Integer} tid = _tid % Int diff --git a/test/runtests.jl b/test/runtests.jl index 79ffaf5..2748b9b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -4,8 +4,8 @@ include("test-suite-preamble.jl") include("internals.jl") include("threadingutilities.jl") -if (!parse(Bool, get(ENV, "GITHUB_ACTIONS", "false"))) && Threads.nthreads() > 3 - include("staticarrays.jl") +if Threads.nthreads() > 3 + include("staticarrays.jl") end include("threadpool.jl") include("warnings.jl") diff --git a/test/staticarrays.jl b/test/staticarrays.jl index f82577b..366af15 100644 --- a/test/staticarrays.jl +++ b/test/staticarrays.jl @@ -1,8 +1,9 @@ -using StaticArrays, ThreadingUtilities +using StaticArrays, ThreadingUtilities, BenchmarkTools struct MulStaticArray{P} end function (::MulStaticArray{P})(p::Ptr{UInt}) where {P} _, (ptry,ptrx) = ThreadingUtilities.load(p, P, 2*sizeof(UInt)) unsafe_store!(ptry, unsafe_load(ptrx) * 2.7) + ThreadingUtilities._atomic_store!(p, ThreadingUtilities.SPIN) nothing end @generated function mul_staticarray_ptr(::A, ::B) where {A,B} @@ -24,7 +25,7 @@ end function waste_time(a, b) s = a * b' - for i ∈ 1:00 + for _ ∈ 1:0 s += a * b' end s @@ -48,13 +49,25 @@ function mul_svector_threads(a::T, b::T, c::T) where {T} end rx[], ry[], rz[], w end +function count_allocated() + a = @SVector rand(16); + b = @SVector rand(16); + c = @SVector rand(16); + @ballocated(mul_svector_threads($a,$b,$c)) +end + @testset "SVector Test" begin a = @SVector rand(16); b = @SVector rand(16); c = @SVector rand(16); w,x,y,z = mul_svector_threads(a, b, c) - @test @allocated(mul_svector_threads(a, b, c)) == 0 + count_allocated() + if !Sys.iswindows() + @test count_allocated() == 0 + else + @show count_allocated() + end @test w == a*2.7 @test x == b*2.7 @test y == c*2.7 @@ -62,12 +75,15 @@ end A = @SMatrix rand(7,9); B = @SMatrix rand(7,9); C = @SMatrix rand(7,9); - Wans = A*2.7; Xans = B*2.7; Yans = C*2.7; Zans = waste_time(A, B) + Wans = A*2.7; Xans = B*2.7; Yans = C*2.7; for i ∈ 1:100 # repeat rapdily + C, A, B = A, B, C W,X,Y,Z = mul_svector_threads(A, B, C) + iseven(i) && ThreadingUtilities.sleep_all_tasks() + (Yans, Wans, Xans) = Wans, Xans, Yans @test W == Wans @test X == Xans @test Y == Yans - @test Z ≈ Zans + @test Z ≈ waste_time(A, B) end end diff --git a/test/threadingutilities.jl b/test/threadingutilities.jl index 6215c1b..bc6767b 100644 --- a/test/threadingutilities.jl +++ b/test/threadingutilities.jl @@ -5,6 +5,7 @@ function (::Copy{P})(p::Ptr{UInt}) where {P} @simd ivdep for n ∈ 1:N unsafe_store!(ptry, unsafe_load(ptrx, n), n) end + ThreadingUtilities._atomic_store!(p, ThreadingUtilities.SPIN) end @generated function copy_ptr(::A, ::B) where {A,B} c = Copy{Tuple{A,B,Int}}() diff --git a/test/threadpool.jl b/test/threadpool.jl index 47ab8a1..06e50f8 100644 --- a/test/threadpool.jl +++ b/test/threadpool.jl @@ -1,6 +1,7 @@ @testset "THREADPOOL" begin - @test isconst(ThreadingUtilities, :THREADPOOL) # test that ThreadingUtilities.THREADPOOL is a constant - @test ThreadingUtilities.THREADPOOL isa Vector{UInt} - @test eltype(ThreadingUtilities.THREADPOOL) === UInt - @test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),(Sys.CPU_THREADS)::Int) - 1) + (256 ÷ sizeof(UInt)) - 1 + @test isconst(ThreadingUtilities, :THREADPOOL) # test that ThreadingUtilities.THREADPOOL is a constant + @test ThreadingUtilities.THREADPOOL isa Vector{UInt} + @test eltype(ThreadingUtilities.THREADPOOL) === UInt + sys_threads::Int = parse(Bool, get(ENV, "GITHUB_ACTIONS", "false")) ? Threads.nthreads() : (Sys.CPU_THREADS)::Int + @test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),sys_threads) - 1) + (256 ÷ sizeof(UInt)) - 1 end