From a0b891ec368f0c0a4b605720e40670c7c43b6e9d Mon Sep 17 00:00:00 2001 From: chriselrod Date: Tue, 29 Jun 2021 12:50:50 -0400 Subject: [PATCH 1/2] Depend on ManualMemory.jl and yield to the task being waited on --- Project.toml | 4 ++ src/ThreadingUtilities.jl | 3 +- src/threadtasks.jl | 7 ++-- src/utils.jl | 84 -------------------------------------- test/threadingutilities.jl | 2 +- 5 files changed, 11 insertions(+), 89 deletions(-) delete mode 100644 src/utils.jl diff --git a/Project.toml b/Project.toml index 17ebc53..fa1d560 100644 --- a/Project.toml +++ b/Project.toml @@ -3,8 +3,12 @@ uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5" authors = ["Chris Elrod and contributors"] version = "0.4.5" +[deps] +ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667" + [compat] Aqua = "0.5" +ManualMemory = "0.1.1" julia = "1.5" [extras] diff --git a/src/ThreadingUtilities.jl b/src/ThreadingUtilities.jl index 37dd7be..abf11d6 100644 --- a/src/ThreadingUtilities.jl +++ b/src/ThreadingUtilities.jl @@ -1,5 +1,7 @@ module ThreadingUtilities +using ManualMemory: load, store! + """ pause() @@ -26,7 +28,6 @@ const THREADPOOLPTR = Ref{Ptr{UInt}}(C_NULL); include("atomics.jl") include("threadtasks.jl") -include("utils.jl") include("warnings.jl") function initialize_task(tid::Int) diff --git a/src/threadtasks.jl b/src/threadtasks.jl index f990a5f..97d44b0 100644 --- a/src/threadtasks.jl +++ b/src/threadtasks.jl @@ -52,12 +52,13 @@ end end # 1-based tid -@inline wait(tid::Integer) = wait(taskpointer(tid)) -@inline function wait(p::Ptr{UInt}) +@inline wait(tid::Integer) = wait(taskpointer(tid), tid) +@inline wait(p::Ptr{UInt}) = wait(p, (reinterpret(Int, p) - reinterpret(Int, THREADPOOLPTR[])) ÷ (THREADBUFFERSIZE)) +@inline function wait(p::Ptr{UInt}, tid::Integer) counter = 0x00000000 while _atomic_state(p) == TASK pause() - ((counter += 0x00000001) > 0x00010000) && yield() + ((counter += 0x00000001) > 0x00010000) && yield(TASKS[tid]) end end diff --git a/src/utils.jl b/src/utils.jl deleted file mode 100644 index 05bee63..0000000 --- a/src/utils.jl +++ /dev/null @@ -1,84 +0,0 @@ -@generated function load(p::Ptr{T}) where {T} - if Base.allocatedinline(T) - Expr(:block, Expr(:meta,:inline), :(unsafe_load(p))) - else - Expr(:block, Expr(:meta,:inline), :(ccall(:jl_value_ptr, Ref{$T}, (Ptr{Cvoid},), unsafe_load(Base.unsafe_convert(Ptr{Ptr{Cvoid}}, p))))) - end -end -@inline load(p::Ptr{UInt}, ::Type{T}) where {T} = load(reinterpret(Ptr{T}, p)) -@generated function store!(p::Ptr{T}, v::T) where {T} - if Base.allocatedinline(T) - Expr(:block, Expr(:meta,:inline), :(unsafe_store!(p, v); return nothing)) - else - Expr(:block, Expr(:meta,:inline), :(unsafe_store!(Base.unsafe_convert(Ptr{Ptr{Cvoid}}, p), Base.pointer_from_objref(v)); return nothing)) - end -end -offsetsize(::Type{T}) where {T} = Base.allocatedinline(T) ? sizeof(T) : sizeof(Int) - -function load_aggregate(::Type{T}, offset::Int) where {T} - numfields = fieldcount(T) - call = (T <: Tuple) ? Expr(:tuple) : Expr(:new, T) - for f ∈ 1:numfields - TF = fieldtype(T, f) - if Base.issingletontype(TF) - push!(call.args, TF.instance) - elseif fieldcount(TF) ≡ 0 - ptr = :(p + (offset + $offset)) - ptr = TF === UInt ? ptr : :(reinterpret(Ptr{$TF}, $ptr)) - push!(call.args, :(load($ptr))) - offset += offsetsize(TF) - else - arg, offset = load_aggregate(TF, offset) - push!(call.args, arg) - end - end - return call, offset -end -@generated function load(p::Ptr{UInt}, ::Type{T}, offset::Int) where {T} - if Base.issingletontype(T) - call = Expr(:tuple, :offset, T.instance) - elseif fieldcount(T) ≡ 0 - ptr = :(p + offset) - ptr = T === UInt ? ptr : :(reinterpret(Ptr{$T}, $ptr)) - call = :(((offset + $(offsetsize(T)), load($ptr)))) - else - call, off = load_aggregate(T, 0) - call = Expr(:tuple, :(offset + $off), call) - end - Expr(:block, Expr(:meta,:inline), call) -end - -function store_aggregate!(q::Expr, sym, ::Type{T}, offset::Int) where {T} - gf = GlobalRef(Core,:getfield) - for f ∈ 1:fieldcount(T) - TF = fieldtype(T, f) - Base.issingletontype(TF) && continue - gfcall = Expr(:call, gf, sym, f) - if fieldcount(TF) ≡ 0 - ptr = :(p + (offset + $offset)) - ptr = TF === UInt ? ptr : :(reinterpret(Ptr{$TF}, $ptr)) - push!(q.args, :(store!($ptr, $gfcall))) - offset += offsetsize(TF) - else - newsym = gensym(sym) - push!(q.args, Expr(:(=), newsym, gfcall)) - offset = store_aggregate!(q, newsym, TF, offset) - end - end - return offset -end -@generated function store!(p::Ptr{UInt}, x::T, offset::Int) where {T} - Base.issingletontype(T) && return :offset - body = Expr(:block, Expr(:meta,:inline)) - if fieldcount(T) ≡ 0 - ptr = :(p + offset) - ptr = T === UInt ? ptr : :(reinterpret(Ptr{$T}, $ptr)) - push!(body.args, :(store!($ptr, x))) - off = offsetsize(T) - else - off = store_aggregate!(body, :x, T, 0) - end - push!(body.args, Expr(:call, +, :offset, off)) - return body -end - diff --git a/test/threadingutilities.jl b/test/threadingutilities.jl index aa2fe9c..9a274b8 100644 --- a/test/threadingutilities.jl +++ b/test/threadingutilities.jl @@ -44,7 +44,7 @@ function test_copy(tid, N = 100_000) ThreadingUtilities.wait(tid) launch_thread_copy!(tid, z, c) yield() - ThreadingUtilities.wait(tid) + ThreadingUtilities.wait(ThreadingUtilities.taskpointer(tid)) end @test a == x @test b == y From de8f0cc5fa7d0c472242523ddb22bae73c6cf695 Mon Sep 17 00:00:00 2001 From: chriselrod Date: Tue, 29 Jun 2021 13:42:31 -0400 Subject: [PATCH 2/2] don't yieldto --- src/threadtasks.jl | 7 +++---- test/threadingutilities.jl | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/threadtasks.jl b/src/threadtasks.jl index 97d44b0..f990a5f 100644 --- a/src/threadtasks.jl +++ b/src/threadtasks.jl @@ -52,13 +52,12 @@ end end # 1-based tid -@inline wait(tid::Integer) = wait(taskpointer(tid), tid) -@inline wait(p::Ptr{UInt}) = wait(p, (reinterpret(Int, p) - reinterpret(Int, THREADPOOLPTR[])) ÷ (THREADBUFFERSIZE)) -@inline function wait(p::Ptr{UInt}, tid::Integer) +@inline wait(tid::Integer) = wait(taskpointer(tid)) +@inline function wait(p::Ptr{UInt}) counter = 0x00000000 while _atomic_state(p) == TASK pause() - ((counter += 0x00000001) > 0x00010000) && yield(TASKS[tid]) + ((counter += 0x00000001) > 0x00010000) && yield() end end diff --git a/test/threadingutilities.jl b/test/threadingutilities.jl index 9a274b8..aa2fe9c 100644 --- a/test/threadingutilities.jl +++ b/test/threadingutilities.jl @@ -44,7 +44,7 @@ function test_copy(tid, N = 100_000) ThreadingUtilities.wait(tid) launch_thread_copy!(tid, z, c) yield() - ThreadingUtilities.wait(ThreadingUtilities.taskpointer(tid)) + ThreadingUtilities.wait(tid) end @test a == x @test b == y