diff --git a/Project.toml b/Project.toml index 158a714..e2f7e97 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ThreadingUtilities" uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5" authors = ["Chris Elrod and contributors"] -version = "0.5.1" +version = "0.5.2" [deps] ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667" diff --git a/src/threadtasks.jl b/src/threadtasks.jl index e9976e7..4e8c272 100644 --- a/src/threadtasks.jl +++ b/src/threadtasks.jl @@ -3,7 +3,7 @@ struct ThreadTask end Base.pointer(tt::ThreadTask) = tt.p -@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid*(THREADBUFFERSIZE%T) +@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid * (THREADBUFFERSIZE % T) @inline function _call(p::Ptr{UInt}) fptr = load(p + sizeof(UInt), Ptr{Cvoid}) @@ -41,7 +41,7 @@ end function _sleep(p::Ptr{UInt}) _atomic_store!(p, WAIT) - Base.wait(); + Base.wait() return nothing end @@ -58,7 +58,7 @@ function sleep_all_tasks() end # 1-based tid, pushes into task 2-nthreads() -@noinline function wake_thread!(_tid::T) where {T <: Integer} +@noinline function wake_thread!(_tid::T) where {T<:Integer} tid = _tid % Int tidp1 = tid + one(tid) assume(unsigned(length(Base.Workqueues)) > unsigned(tid)) @@ -70,8 +70,6 @@ end @noinline function checktask(tid) t = TASKS[tid] if istaskfailed(t) - show(stderr, MIME"text/plain"(), t) - println() initialize_task(tid) return true end @@ -79,8 +77,9 @@ end false end # 1-based tid +@inline tasktid(p::Ptr{UInt}) = (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE) @inline wait(tid::Integer) = wait(taskpointer(tid), tid) -@inline wait(p::Ptr{UInt}) = wait(p, (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE)) +@inline wait(p::Ptr{UInt}) = wait(p, tasktid(p)) @inline function wait(p::Ptr{UInt}, tid) counter = 0x00000000 while _atomic_state(p) == TASK diff --git a/test/threadingutilities.jl b/test/threadingutilities.jl index bc6767b..bb43d74 100644 --- a/test/threadingutilities.jl +++ b/test/threadingutilities.jl @@ -1,6 +1,6 @@ struct Copy{P} end function (::Copy{P})(p::Ptr{UInt}) where {P} - _, (ptry,ptrx,N) = ThreadingUtilities.load(p, P, 2*sizeof(UInt)) + _, (ptry, ptrx, N) = ThreadingUtilities.load(p, P, 2 * sizeof(UInt)) N > 0 || throw("This function throws if N == 0 for testing purposes.") @simd ivdep for n ∈ 1:N unsafe_store!(ptry, unsafe_load(ptrx, n), n) @@ -20,18 +20,18 @@ function setup_copy!(p, y, x) px = pointer(x) fptr = copy_ptr(py, px) offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt)) - ThreadingUtilities.store!(p, (py,px,N), offset) + ThreadingUtilities.store!(p, (py, px, N), offset) end @inline launch_thread_copy!(tid, y, x) = ThreadingUtilities.launch(setup_copy!, tid, y, x) function test_copy(tid, N = 100_000) - a = rand(N); - b = rand(N); - c = rand(N); - x = similar(a) .= NaN; - y = similar(b) .= NaN; - z = similar(c) .= NaN; + a = rand(N) + b = rand(N) + c = rand(N) + x = similar(a) .= NaN + y = similar(b) .= NaN + z = similar(c) .= NaN GC.@preserve a b c x y z begin launch_thread_copy!(tid, x, a) yield() @@ -53,25 +53,34 @@ end @test unsafe_load(Ptr{UInt32}(ThreadingUtilities.taskpointer(tid))) == 0x00000001 end @test all(eachindex(ThreadingUtilities.TASKS)) do tid - ThreadingUtilities.load(ThreadingUtilities.taskpointer(tid), ThreadingUtilities.ThreadState) === ThreadingUtilities.WAIT + ThreadingUtilities.load( + ThreadingUtilities.taskpointer(tid), + ThreadingUtilities.ThreadState, + ) === ThreadingUtilities.WAIT end @test all(eachindex(ThreadingUtilities.TASKS)) do tid - ThreadingUtilities._atomic_load(reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid))) === reinterpret(UInt32, ThreadingUtilities.WAIT) + ThreadingUtilities._atomic_load( + reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid)), + ) === reinterpret(UInt32, ThreadingUtilities.WAIT) end foreach(test_copy, eachindex(ThreadingUtilities.TASKS)) - - x = rand(UInt, 3); + + x = rand(UInt, 3) GC.@preserve x begin ThreadingUtilities._atomic_store!(pointer(x), zero(UInt)) - @test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) == ThreadingUtilities.TASK - @test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) == ThreadingUtilities.WAIT - @test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) == ThreadingUtilities.WAIT - @test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) == ThreadingUtilities.SPIN + @test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) == + ThreadingUtilities.TASK + @test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) == + ThreadingUtilities.WAIT + @test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) == + ThreadingUtilities.WAIT + @test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) == + ThreadingUtilities.SPIN end for tid ∈ eachindex(ThreadingUtilities.TASKS) launch_thread_copy!(tid, Float64[], Float64[]) end - yield() + sleep(1) @test all(istaskfailed, ThreadingUtilities.TASKS) @test all(ThreadingUtilities.wait, eachindex(ThreadingUtilities.TASKS)) @test !any(istaskfailed, ThreadingUtilities.TASKS)