From 31402c8094a90beb5240d7b0fe81493c0859cce5 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 21:46:14 -0400 Subject: [PATCH 1/8] Add a bug reproducer for WorkStealingDeque --- .../src/test_work_stealing_deque.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index c737c90..1cccb5a 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -63,8 +63,10 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) end function test_random_push_pop() - @testset for T in [Int, Any, Int, Any] - test_random_push_pop(T) + @testset for trial in 1:100 + @testset for T in [Int, Any, Int, Any] + test_random_push_pop(T) + end end end From ac86acaa64c37224b7e4aa47d07d7c615455a94f Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 21:41:38 -0400 Subject: [PATCH 2/8] Fix WorkStealingDeque --- src/workstealing.jl | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/workstealing.jl b/src/workstealing.jl index ee18c19..12400a9 100644 --- a/src/workstealing.jl +++ b/src/workstealing.jl @@ -30,6 +30,8 @@ Base.@propagate_inbounds function Base.setindex!(A::CircularVector, v, i::Int) A.data[indexof(A, i)] = v end +Base.pointer(A::CircularVector, i::Integer) = pointer(A.data, indexof(A, i)) + function tryresize(A::CircularVector, log2inc::Integer, indices) log2length = A.log2length + log2inc n = 1 << log2length @@ -134,11 +136,23 @@ function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque) if current_size <= 0 return nothing end - r = Some(buffer[top]) - if @atomicreplace(deque.top, top => top + 1)[2] - return r + if Base.allocatedinline(eltype(buffer)) + r = Some(buffer[top]) + if @atomicreplace(deque.top, top => top + 1)[2] + return r + else + return nothing + end else - return nothing + ptr = UnsafeAtomics.load(Ptr{Ptr{Cvoid}}(pointer(buffer, top)), monotonic) + if @atomicreplace(deque.top, top => top + 1)[2] + GC.@preserve buffer begin + r = Some(unsafe_pointer_to_objref(ptr)) + end + return r + else + return nothing + end end end From 60a8c7c7a241408a3fb1afd8dd3548bb39492fd9 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:12:02 -0400 Subject: [PATCH 3/8] Remove a redundant loop --- test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 1cccb5a..4f70a65 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -64,7 +64,7 @@ end function test_random_push_pop() @testset for trial in 1:100 - @testset for T in [Int, Any, Int, Any] + @testset for T in [Int, Any] test_random_push_pop(T) end end From 9e39e4c1841bf083989535b259e732c58d40861e Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:13:58 -0400 Subject: [PATCH 4/8] Add a safety comment --- src/workstealing.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/workstealing.jl b/src/workstealing.jl index 12400a9..679811e 100644 --- a/src/workstealing.jl +++ b/src/workstealing.jl @@ -146,6 +146,9 @@ function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque) else ptr = UnsafeAtomics.load(Ptr{Ptr{Cvoid}}(pointer(buffer, top)), monotonic) if @atomicreplace(deque.top, top => top + 1)[2] + # Safety: The above CAS verifies that the slot `buffer[top]` + # contained the valid element. We can now materialize it as a Julia + # value. GC.@preserve buffer begin r = Some(unsafe_pointer_to_objref(ptr)) end From 95a3ceb8db8b6af9109aad2653417bc5d7420cfd Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:23:05 -0400 Subject: [PATCH 5/8] Better test reporting --- .../src/test_work_stealing_deque.jl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 4f70a65..3caaeeb 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -79,7 +79,9 @@ function test_random_push_pop(T::Type, xs = 1:2^10) @test all(allunique, yss) @debug "random_pushpop(xs)" length(zs) length.(yss) ys = sort!(foldl(append!, yss; init = copy(zs))) - @debug "random_pushpop(xs)" setdiff(ys, xs) setdiff(xs, ys) length(xs) length(ys) + @test length(ys) == length(xs) + @test setdiff(ys, xs) == [] + @test setdiff(xs, ys) == [] @test ys == xs end From 063ca28e3fd46981b1fe1dff79169d6b9e933c3a Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:28:38 -0400 Subject: [PATCH 6/8] Insert safepoint to avoid deadlock --- test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 3caaeeb..2ab0c99 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -36,6 +36,7 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) while true r = trypopfirst!(deque) if r === nothing + GC.safepoint() done[] && break continue end @@ -51,6 +52,7 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) # continue if mod(i, 8) == 0 r = trypop!(deque) + GC.safepoint() r === nothing && continue push!(zs, something(r)) end From 31ba47233b7c7bccecfc55274d82da1747c8f1fb Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:38:33 -0400 Subject: [PATCH 7/8] Return states from test_random_push_pop for debugging --- test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 2ab0c99..333a9b1 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -85,6 +85,7 @@ function test_random_push_pop(T::Type, xs = 1:2^10) @test setdiff(ys, xs) == [] @test setdiff(xs, ys) == [] @test ys == xs + return (; zs, yss) end end # module From 1f2df1567556d4b8d7b5e982d4f57b9146bb0f9e Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 22:46:33 -0400 Subject: [PATCH 8/8] Fix random_pushpop --- .../src/test_work_stealing_deque.jl | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 333a9b1..773212a 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -28,19 +28,19 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) deque = WorkStealingDeque{eltype(xs)}() local tasks, zs - done = Threads.Atomic{Bool}(false) try tasks = map(1:ntasks) do _ Threads.@spawn begin local ys = eltype(xs)[] while true - r = trypopfirst!(deque) + local r = trypopfirst!(deque) if r === nothing GC.safepoint() - done[] && break continue end - push!(ys, something(r)) + local y = something(r) + y == -1 && break + push!(ys, y) end ys end @@ -58,7 +58,9 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) end end finally - done[] = true + for _ in 1:ntasks + push!(deque, -1) + end end return zs, fetch.(tasks)