Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ThreadingUtilities"
uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5"
authors = ["Chris Elrod <elrodc@gmail.com> and contributors"]
version = "0.5.1"
version = "0.5.2"

[deps]
ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667"
Expand Down
11 changes: 5 additions & 6 deletions src/threadtasks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ struct ThreadTask
end
Base.pointer(tt::ThreadTask) = tt.p

@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid*(THREADBUFFERSIZE%T)
@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid * (THREADBUFFERSIZE % T)

@inline function _call(p::Ptr{UInt})
fptr = load(p + sizeof(UInt), Ptr{Cvoid})
Expand Down Expand Up @@ -41,7 +41,7 @@ end

function _sleep(p::Ptr{UInt})
_atomic_store!(p, WAIT)
Base.wait();
Base.wait()
return nothing
end

Expand All @@ -58,7 +58,7 @@ function sleep_all_tasks()
end

# 1-based tid, pushes into task 2-nthreads()
@noinline function wake_thread!(_tid::T) where {T <: Integer}
@noinline function wake_thread!(_tid::T) where {T<:Integer}
tid = _tid % Int
tidp1 = tid + one(tid)
assume(unsigned(length(Base.Workqueues)) > unsigned(tid))
Expand All @@ -70,17 +70,16 @@ end
@noinline function checktask(tid)
t = TASKS[tid]
if istaskfailed(t)
show(stderr, MIME"text/plain"(), t)
println()
initialize_task(tid)
return true
end
yield()
false
end
# 1-based tid
@inline tasktid(p::Ptr{UInt}) = (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE)
@inline wait(tid::Integer) = wait(taskpointer(tid), tid)
@inline wait(p::Ptr{UInt}) = wait(p, (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE))
@inline wait(p::Ptr{UInt}) = wait(p, tasktid(p))
@inline function wait(p::Ptr{UInt}, tid)
counter = 0x00000000
while _atomic_state(p) == TASK
Expand Down
43 changes: 26 additions & 17 deletions test/threadingutilities.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
struct Copy{P} end
function (::Copy{P})(p::Ptr{UInt}) where {P}
_, (ptry,ptrx,N) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
_, (ptry, ptrx, N) = ThreadingUtilities.load(p, P, 2 * sizeof(UInt))
N > 0 || throw("This function throws if N == 0 for testing purposes.")
@simd ivdep for n ∈ 1:N
unsafe_store!(ptry, unsafe_load(ptrx, n), n)
Expand All @@ -20,18 +20,18 @@ function setup_copy!(p, y, x)
px = pointer(x)
fptr = copy_ptr(py, px)
offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
ThreadingUtilities.store!(p, (py,px,N), offset)
ThreadingUtilities.store!(p, (py, px, N), offset)
end

@inline launch_thread_copy!(tid, y, x) = ThreadingUtilities.launch(setup_copy!, tid, y, x)

function test_copy(tid, N = 100_000)
a = rand(N);
b = rand(N);
c = rand(N);
x = similar(a) .= NaN;
y = similar(b) .= NaN;
z = similar(c) .= NaN;
a = rand(N)
b = rand(N)
c = rand(N)
x = similar(a) .= NaN
y = similar(b) .= NaN
z = similar(c) .= NaN
GC.@preserve a b c x y z begin
launch_thread_copy!(tid, x, a)
yield()
Expand All @@ -53,25 +53,34 @@ end
@test unsafe_load(Ptr{UInt32}(ThreadingUtilities.taskpointer(tid))) == 0x00000001
end
@test all(eachindex(ThreadingUtilities.TASKS)) do tid
ThreadingUtilities.load(ThreadingUtilities.taskpointer(tid), ThreadingUtilities.ThreadState) === ThreadingUtilities.WAIT
ThreadingUtilities.load(
ThreadingUtilities.taskpointer(tid),
ThreadingUtilities.ThreadState,
) === ThreadingUtilities.WAIT
end
@test all(eachindex(ThreadingUtilities.TASKS)) do tid
ThreadingUtilities._atomic_load(reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid))) === reinterpret(UInt32, ThreadingUtilities.WAIT)
ThreadingUtilities._atomic_load(
reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid)),
) === reinterpret(UInt32, ThreadingUtilities.WAIT)
end
foreach(test_copy, eachindex(ThreadingUtilities.TASKS))
x = rand(UInt, 3);

x = rand(UInt, 3)
GC.@preserve x begin
ThreadingUtilities._atomic_store!(pointer(x), zero(UInt))
@test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) == ThreadingUtilities.TASK
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) == ThreadingUtilities.WAIT
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) == ThreadingUtilities.WAIT
@test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) == ThreadingUtilities.SPIN
@test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) ==
ThreadingUtilities.TASK
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) ==
ThreadingUtilities.WAIT
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) ==
ThreadingUtilities.WAIT
@test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) ==
ThreadingUtilities.SPIN
end
for tid ∈ eachindex(ThreadingUtilities.TASKS)
launch_thread_copy!(tid, Float64[], Float64[])
end
yield()
sleep(1)
@test all(istaskfailed, ThreadingUtilities.TASKS)
@test all(ThreadingUtilities.wait, eachindex(ThreadingUtilities.TASKS))
@test !any(istaskfailed, ThreadingUtilities.TASKS)
Expand Down