From 0cc2059ce00c270d577df5df5dad1ad0fd5cde08 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 19 Sep 2021 23:44:06 -0400 Subject: [PATCH] Support immutables containing boxed values --- src/workstealing.jl | 17 ++++++++++--- .../src/test_work_stealing_deque.jl | 25 ++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/workstealing.jl b/src/workstealing.jl index 679811e..c64f022 100644 --- a/src/workstealing.jl +++ b/src/workstealing.jl @@ -43,14 +43,22 @@ function tryresize(A::CircularVector, log2inc::Integer, indices) return B end -mutable struct WorkStealingDeque{T} - @atomic buffer::CircularVector{T} +mutable struct WorkStealingDeque{T,S} + @atomic buffer::CircularVector{S} @atomic top::Int @atomic bottom::Int # TODO: pad end -WorkStealingDeque{T}() where {T} = WorkStealingDeque{T}(CircularVector{T}(4), 1, 1) +function WorkStealingDeque{T}() where {T} + if isbitstype(T) || Base.isbitsunion(T) + # TODO: support Some{Union{Int,Nothing}} etc. + S = T + else + S = Any + end + return WorkStealingDeque{T,S}(CircularVector{S}(4), 1, 1) +end Base.eltype(::Type{WorkStealingDeque{T}}) where {T} = T @@ -98,6 +106,8 @@ function Base.push!(deque::WorkStealingDeque, v) buffer = grow!(deque, buffer, top, bottom) @atomic deque.buffer = buffer end + # TODO: Technically, this should use atomic store. However, there is no way + # to do this in a GC-compatible way at the moment. buffer[bottom] = v bottom += 1 @atomic deque.bottom = bottom @@ -115,6 +125,7 @@ function ConcurrentCollections.trypop!(deque::WorkStealingDeque) @atomic deque.bottom = top return nothing end + # TODO: Technically, this should also use atomic load r = Some(buffer[bottom]) if next_size > 0 tryshrink!(deque, buffer, top, bottom) diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index 773212a..c9d7097 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -22,7 +22,8 @@ function test_single_thread_push_pop() @test [pop!(deque) for _ in xs] == reverse(xs) end -function random_pushpop(xs, ntasks = Threads.nthreads() - 1) +function random_pushpop(xs; ntasks = Threads.nthreads() - 1, sentinel = -1) + sentinel = convert(eltype(xs), sentinel) # verify argument ntasks = max(1, ntasks) deque = WorkStealingDeque{eltype(xs)}() @@ -39,7 +40,7 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) continue end local y = something(r) - y == -1 && break + y == $sentinel && break push!(ys, y) end ys @@ -59,7 +60,7 @@ function random_pushpop(xs, ntasks = Threads.nthreads() - 1) end finally for _ in 1:ntasks - push!(deque, -1) + push!(deque, sentinel) end end @@ -68,17 +69,29 @@ end function test_random_push_pop() @testset for trial in 1:100 - @testset for T in [Int, Any] + @testset for T in [Int, Any, Pair{Any,Int}] test_random_push_pop(T) end end end -function test_random_push_pop(T::Type, xs = 1:2^10) +function test_random_push_pop(T::Type) + sentinel = -1 + xs = 1:2^10 + return test_random_push_pop(T::Type, xs, sentinel) +end + +function test_random_push_pop(T::Type{Pair{Any,Int}}) + sentinel = T(-1, -1) + xs = [T(x, x) for x in 1:2^10] + return test_random_push_pop(T::Type, xs, sentinel) +end + +function test_random_push_pop(T::Type, xs, sentinel) if T !== eltype(xs) xs = collect(T, xs) end - zs, yss = random_pushpop(xs) + zs, yss = random_pushpop(xs; sentinel) @test allunique(zs) @test all(allunique, yss) @debug "random_pushpop(xs)" length(zs) length.(yss)