Skip to content

Commit

Permalink
Merge pull request #46 from JuliaSIMD/catch
Browse files Browse the repository at this point in the history
Catch
  • Loading branch information
chriselrod committed Jun 28, 2023
2 parents e7f2f4b + 7946fe6 commit b2a7f66
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Project.toml
@@ -1,7 +1,7 @@
name = "ThreadingUtilities"
uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5"
authors = ["Chris Elrod <elrodc@gmail.com> and contributors"]
version = "0.5.1"
version = "0.5.2"

[deps]
ManualMemory = "d125e4d3-2237-4719-b19c-fa641b8a4667"
Expand Down
11 changes: 5 additions & 6 deletions src/threadtasks.jl
Expand Up @@ -3,7 +3,7 @@ struct ThreadTask
end
Base.pointer(tt::ThreadTask) = tt.p

@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid*(THREADBUFFERSIZE%T)
@inline taskpointer(tid::T) where {T} = THREADPOOLPTR[] + tid * (THREADBUFFERSIZE % T)

@inline function _call(p::Ptr{UInt})
fptr = load(p + sizeof(UInt), Ptr{Cvoid})
Expand Down Expand Up @@ -41,7 +41,7 @@ end

function _sleep(p::Ptr{UInt})
_atomic_store!(p, WAIT)
Base.wait();
Base.wait()
return nothing
end

Expand All @@ -58,7 +58,7 @@ function sleep_all_tasks()
end

# 1-based tid, pushes into task 2-nthreads()
@noinline function wake_thread!(_tid::T) where {T <: Integer}
@noinline function wake_thread!(_tid::T) where {T<:Integer}
tid = _tid % Int
tidp1 = tid + one(tid)
assume(unsigned(length(Base.Workqueues)) > unsigned(tid))
Expand All @@ -70,17 +70,16 @@ end
@noinline function checktask(tid)
t = TASKS[tid]
if istaskfailed(t)
show(stderr, MIME"text/plain"(), t)
println()
initialize_task(tid)
return true
end
yield()
false
end
# 1-based tid
@inline tasktid(p::Ptr{UInt}) = (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE)
@inline wait(tid::Integer) = wait(taskpointer(tid), tid)
@inline wait(p::Ptr{UInt}) = wait(p, (p - THREADPOOLPTR[]) ÷ (THREADBUFFERSIZE))
@inline wait(p::Ptr{UInt}) = wait(p, tasktid(p))
@inline function wait(p::Ptr{UInt}, tid)
counter = 0x00000000
while _atomic_state(p) == TASK
Expand Down
43 changes: 26 additions & 17 deletions test/threadingutilities.jl
@@ -1,6 +1,6 @@
struct Copy{P} end
function (::Copy{P})(p::Ptr{UInt}) where {P}
_, (ptry,ptrx,N) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
_, (ptry, ptrx, N) = ThreadingUtilities.load(p, P, 2 * sizeof(UInt))
N > 0 || throw("This function throws if N == 0 for testing purposes.")
@simd ivdep for n 1:N
unsafe_store!(ptry, unsafe_load(ptrx, n), n)
Expand All @@ -20,18 +20,18 @@ function setup_copy!(p, y, x)
px = pointer(x)
fptr = copy_ptr(py, px)
offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
ThreadingUtilities.store!(p, (py,px,N), offset)
ThreadingUtilities.store!(p, (py, px, N), offset)
end

@inline launch_thread_copy!(tid, y, x) = ThreadingUtilities.launch(setup_copy!, tid, y, x)

function test_copy(tid, N = 100_000)
a = rand(N);
b = rand(N);
c = rand(N);
x = similar(a) .= NaN;
y = similar(b) .= NaN;
z = similar(c) .= NaN;
a = rand(N)
b = rand(N)
c = rand(N)
x = similar(a) .= NaN
y = similar(b) .= NaN
z = similar(c) .= NaN
GC.@preserve a b c x y z begin
launch_thread_copy!(tid, x, a)
yield()
Expand All @@ -53,25 +53,34 @@ end
@test unsafe_load(Ptr{UInt32}(ThreadingUtilities.taskpointer(tid))) == 0x00000001
end
@test all(eachindex(ThreadingUtilities.TASKS)) do tid
ThreadingUtilities.load(ThreadingUtilities.taskpointer(tid), ThreadingUtilities.ThreadState) === ThreadingUtilities.WAIT
ThreadingUtilities.load(
ThreadingUtilities.taskpointer(tid),
ThreadingUtilities.ThreadState,
) === ThreadingUtilities.WAIT
end
@test all(eachindex(ThreadingUtilities.TASKS)) do tid
ThreadingUtilities._atomic_load(reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid))) === reinterpret(UInt32, ThreadingUtilities.WAIT)
ThreadingUtilities._atomic_load(
reinterpret(Ptr{UInt32}, ThreadingUtilities.taskpointer(tid)),
) === reinterpret(UInt32, ThreadingUtilities.WAIT)
end
foreach(test_copy, eachindex(ThreadingUtilities.TASKS))
x = rand(UInt, 3);

x = rand(UInt, 3)
GC.@preserve x begin
ThreadingUtilities._atomic_store!(pointer(x), zero(UInt))
@test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) == ThreadingUtilities.TASK
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) == ThreadingUtilities.WAIT
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) == ThreadingUtilities.WAIT
@test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) == ThreadingUtilities.SPIN
@test ThreadingUtilities._atomic_xchg!(pointer(x), ThreadingUtilities.WAIT) ==
ThreadingUtilities.TASK
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.TASK) ==
ThreadingUtilities.WAIT
@test ThreadingUtilities._atomic_umax!(pointer(x), ThreadingUtilities.SPIN) ==
ThreadingUtilities.WAIT
@test ThreadingUtilities.load(pointer(x), ThreadingUtilities.ThreadState) ==
ThreadingUtilities.SPIN
end
for tid eachindex(ThreadingUtilities.TASKS)
launch_thread_copy!(tid, Float64[], Float64[])
end
yield()
sleep(1)
@test all(istaskfailed, ThreadingUtilities.TASKS)
@test all(ThreadingUtilities.wait, eachindex(ThreadingUtilities.TASKS))
@test !any(istaskfailed, ThreadingUtilities.TASKS)
Expand Down

2 comments on commit b2a7f66

@chriselrod
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/86463

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.5.2 -m "<description of version>" b2a7f6625b5c9fc19b3740b1e0961b558c311179
git push origin v0.5.2

Please sign in to comment.