From 22086f85233622ecb672821bfe298dd9016de430 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 12 Sep 2021 22:55:23 -0400 Subject: [PATCH 1/4] Re-enable LCRQ concurrent push pop test --- test/test_lcrq.jl | 56 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/test/test_lcrq.jl b/test/test_lcrq.jl index c6da109..bf1c06e 100644 --- a/test/test_lcrq.jl +++ b/test/test_lcrq.jl @@ -1,5 +1,6 @@ module TestLCRQ +using Base.Experimental: @sync using ConcurrentCollections using ConcurrentCollections.Implementations: ICRQIndex using Test @@ -29,12 +30,18 @@ end function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer) received = Vector{Int}[] + activesenders = Threads.Atomic{Int}(nsend) @sync begin for t in 1:nsend Threads.@spawn begin for i in t:nsend:nitems push!(q, i) end + if Threads.atomic_sub!(activesenders, 1) == 1 + for _ in 1:nrecv + push!(q, -1) + end + end end end for _ in 1:nrecv @@ -47,10 +54,8 @@ function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer yield() else i = something(y) + i == -1 && break push!(ys, i) - if i > nitems - nrecv - break - end end end end @@ -59,23 +64,40 @@ function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer return received end +function check_consecutive(xs) + notfound = Int[] + dups = Int[] + pre = xs[begin] - 1 + for x in xs + e = pre + 1 + append!(notfound, e:x-1) + append!(dups, x:e-1) + pre = x + end + return (; notfound, dups) +end + @testset "concurrent push-pop" begin - @test_broken false - #= if Threads.nthreads() > 1 - nsend = cld(Threads.nthreads(), 2) - nrecv = Threads.nthreads() - nsend - @assert nsend ≥ 1 - @assert nrecv ≥ 1 - q = LinkedConcurrentRingQueue{Int}(32) - nitems = 2^20 - received = concurrent_push_pop!(q, nitems, nsend, nrecv) - allreceived = reduce(vcat, received) - @test length(allreceived) == nitems - sort!(allreceived) - @test allreceived == 1:nitems + @testset for trial in 1:100 + # @show trial + nsend = cld(Threads.nthreads(), 2) + nrecv = Threads.nthreads() - nsend + @assert nsend ≥ 1 + @assert nrecv ≥ 1 + q = LinkedConcurrentRingQueue{Int16}(32) + # nitems = 2^20 + nitems = typemax(Int16) + received = concurrent_push_pop!(q, nitems, nsend, nrecv) + allreceived = reduce(vcat, received) + @test length(allreceived) == nitems + sort!(allreceived) + (; notfound, dups) = check_consecutive(allreceived) + @test notfound == [] + @test dups == [] + @test allreceived == 1:nitems + end end - =# end end # module From 6d934a26615f454aef4fb98e8109f84e83e4a9fd Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 12 Sep 2021 23:21:52 -0400 Subject: [PATCH 2/4] Revert the remnant of debugging --- test/test_lcrq.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/test_lcrq.jl b/test/test_lcrq.jl index bf1c06e..4534ba8 100644 --- a/test/test_lcrq.jl +++ b/test/test_lcrq.jl @@ -85,9 +85,8 @@ end nrecv = Threads.nthreads() - nsend @assert nsend ≥ 1 @assert nrecv ≥ 1 - q = LinkedConcurrentRingQueue{Int16}(32) - # nitems = 2^20 - nitems = typemax(Int16) + q = LinkedConcurrentRingQueue{Int}(32) + nitems = 2^20 received = concurrent_push_pop!(q, nitems, nsend, nrecv) allreceived = reduce(vcat, received) @test length(allreceived) == nitems From db948aa6f216b1c52aa45bfd9da799e54642ef32 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 12 Sep 2021 22:56:54 -0400 Subject: [PATCH 3/4] Fix LCRQ --- src/dlcrq.jl | 3 -- src/lcrq.jl | 96 +++++++++++------------------------------------ test/test_lcrq.jl | 7 ---- 3 files changed, 22 insertions(+), 84 deletions(-) diff --git a/src/dlcrq.jl b/src/dlcrq.jl index 753cf6f..a8e764b 100644 --- a/src/dlcrq.jl +++ b/src/dlcrq.jl @@ -165,9 +165,6 @@ const SmallUnsigned = Union{Bool,UInt8,UInt16} const NullableInline32 = Union{SmallSigned,SmallUnsigned} function IndirectMultiPolarityConcurrentRingQueueNode{T}(log2len::Int) where {T} - if !(0 <= log2len <= ITEMINDEX_BITS32) - error("Expected: 0 <= log2len <= $ITEMINDEX_BITS32; Got log2len = $log2len") - end len = 2^log2len if T <: NullableInline32 && isconcretetype(T) data = nothing diff --git a/src/lcrq.jl b/src/lcrq.jl index ee5fb3e..d0e5a5f 100644 --- a/src/lcrq.jl +++ b/src/lcrq.jl @@ -10,34 +10,6 @@ bool(x::Int32Bool) = x.bits > 0 setbool(x::Int32Bool, b::Bool) = Int32Bool(int(x), b) setint(x::Int32Bool, i::Int32) = Int32Bool(i, bool(x)) -struct ICRQIndex - bits::UInt32 -end - -const THREADINDEX_BITS32 = 8 -const ITEMINDEX_BITS32 = 32 - THREADINDEX_BITS32 - -function ICRQIndex(; threadindex, itemindex) - local bits::UInt32 - # threadindex ≥ 1 << THREADINDEX_BITS32 && error("threadindex too large $threadindex") - # itemindex ≥ 1 << ITEMINDEX_BITS32 && error("itemindex too large $itemindex") - bits = UInt32(threadindex) << ITEMINDEX_BITS32 - bits |= UInt32(itemindex) & (typemax(UInt32) >> THREADINDEX_BITS32) - return ICRQIndex(bits) -end - -@inline function Base.getproperty(idx::ICRQIndex, name::Symbol) - if name === :threadindex - bits = getfield(idx, :bits) - return (bits >> (32 - 8)) % Int32 - elseif name === :itemindex - bits = getfield(idx, :bits) - return (bits & (typemax(UInt32) >> 8)) % Int32 - else - return getfield(idx, name) - end -end - struct CRQSlot{S} index_safe::Int32Bool storage::S # sizeof(S) ≤ 4 @@ -74,17 +46,15 @@ mutable struct IndirectConcurrentRingQueueNode{T} @atomic tail_closed::Int32 _tail_pad::PadAfter32 @atomic next::Union{Nothing,IndirectConcurrentRingQueueNode{T}} - ring::Vector{CRQSlot{ICRQIndex}} + ring::Vector{CRQSlot{UInt32}} length::Int buffers::Vector{Vector{T}} - buffertails::Vector{Int} end # TODO: pad function IndirectConcurrentRingQueueNode{T}(len::Int) where {T} - buffers = [Vector{T}(undef, 2len) for _ in 1:Threads.nthreads()] - buffertails = fill(1, Threads.nthreads()) - ring = [CRQSlot(Int32Bool(Int32(i), true), ICRQIndex(0)) for i in 1:len] + buffers = [Vector{T}(undef, len) for _ in 1:Threads.nthreads()] + ring = [CRQSlot(Int32Bool(Int32(i), true), UInt32(0)) for i in 1:len] head = Int32(1) tail_closed = Int32(1) return IndirectConcurrentRingQueueNode( @@ -96,7 +66,6 @@ function IndirectConcurrentRingQueueNode{T}(len::Int) where {T} ring, len, buffers, - buffertails, ) end @@ -136,7 +105,7 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x) starvation_ctr = 0 while true - t = @atomic crq.tail_closed += true + t = (@atomic crq.tail_closed += true) - true if isclosed(crq, t) return false # closed elseif t ≥ typemax(Int32) - Threads.nthreads() @@ -146,21 +115,21 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x) _close(crq, t) return false # closed end - slotptr = pointer(crq.ring, mod1(t, crq.length)) - slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex} + # TODO: use shift like DLCRQ + itemindex = mod1(t, crq.length) + slotptr = pointer(crq.ring, itemindex) + slot = UnsafeAtomics.load(slotptr)::CRQSlot{UInt32} (; index, safe, storage) = slot - if iszero(storage.bits) + if iszero(storage) if (index ≤ t) && (safe || (@atomic crq.head) ≤ t) tid = Threads.threadid() buffer = crq.buffers[tid] - localtail = crq.buffertails[tid] + 1 - itemindex = mod1(localtail, length(buffer)) - buffer[itemindex] = x # [^itemindex] - bidx = ICRQIndex(; threadindex = tid, itemindex) - newslot = CRQSlot(; safe = true, index = t, storage = bidx) + buffer[itemindex] = x + storage = UInt32(tid) + # storage = _embed(UInt32, x) + newslot = CRQSlot(; safe = true, index = t, storage) old = UnsafeAtomics.cas!(slotptr, slot, newslot) if old == slot - crq.buffertails[tid] = localtail return true end end @@ -173,27 +142,20 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x) GC.safepoint() # must not yield end end -# [^itemindex]: `buffer[itemindex] = x` is OK since there is no other thread -# that can write to this memory location. Using `length(ring) == -# length(buffer)` is OK since at this point we know that there is at least one -# empty slot and only other threads can make the `ring` full. However, other -# threads need to put the item in their own buffer. -# TODO: So, 2len -> len -# TODO: This reasoning was wrong in the Dual LCRQ. It may be due to calls to -# denqueue stalled right after FAI (handled with the `safe` bit). Check if -# something similar is possible in LCRQ and stop using `ICRQIndex` if so. function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode) while true - h = @atomic crq.head += true - slotptr = pointer(crq.ring, mod1(h, crq.length)) + h = (@atomic crq.head += true) - true + itemindex = mod1(h, crq.length) # TODO: shift + slotptr = pointer(crq.ring, itemindex) while true - slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex} + slot = UnsafeAtomics.load(slotptr)::CRQSlot{UInt32} (; index, safe, storage) = slot - if !iszero(storage.bits) + if !iszero(storage) if index == h - (; threadindex, itemindex) = storage + threadindex = storage x = crq.buffers[threadindex][itemindex] + # x = _extract(eltype(crq), storage) # Above load requires "tearable atomics" for immutables; for # boxed Julia objects (i.e., pointer), it's probably # equivalent to asking the compiler to not root `x` until @@ -201,8 +163,8 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode # before reading the type tag. (That said, currently it may # not be a problem since GC stops all tasks?) - newslot = - CRQSlot(; safe, index = h + crq.length, storage = ICRQIndex(0)) + # newslot = CRQSlot(; safe, index = h + crq.length, storage = UInt32(0)) + newslot = CRQSlot(; safe, index = h + crq.length, storage = UInt32(0)) old = UnsafeAtomics.cas!(slotptr, slot, newslot) if old === slot return Some{eltype(crq)}(x) @@ -324,17 +286,3 @@ function Base.show(io::IO, ::MIME"text/plain", crq::IndirectConcurrentRingQueueN status = isclosed(crq) ? "closed" : "open" print(io, "CRQ: $nitems item(s) (status: $status, head: $h, tail: $t)") end - -function Base.show(io::IO, ::MIME"text/plain", index::ICRQIndex) - if get(io, :typeinfo, Any) !== typeof(index) - invoke(show, Tuple{IO,MIME"text/plain",Any}, io, MIME"text/plain"(), index) - return - end - if iszero(index.bits) - print(io, "#null") - return - end - (; threadindex, itemindex) = index - nt = (; threadindex, itemindex) - show(io, MIME"text/plain"(), nt) -end diff --git a/test/test_lcrq.jl b/test/test_lcrq.jl index 4534ba8..9ca75df 100644 --- a/test/test_lcrq.jl +++ b/test/test_lcrq.jl @@ -2,15 +2,8 @@ module TestLCRQ using Base.Experimental: @sync using ConcurrentCollections -using ConcurrentCollections.Implementations: ICRQIndex using Test -@testset "ICRQIndex" begin - idx = ICRQIndex(threadindex = 111, itemindex = 222) - @test idx.threadindex == 111 - @test idx.itemindex == 222 -end - @testset "push-pop once" begin q = LinkedConcurrentRingQueue{Int}() push!(q, 111) From 1f9b3f1fa93cb27843a18e7add67b423cb2267e0 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sun, 12 Sep 2021 23:33:18 -0400 Subject: [PATCH 4/4] Really fix LCRQ --- src/lcrq.jl | 26 +++++++- test/test_crq.jl | 158 ++++++++++++++++++++++++++++++++++++++++++++++ test/test_lcrq.jl | 1 - 3 files changed, 182 insertions(+), 3 deletions(-) create mode 100644 test/test_crq.jl diff --git a/src/lcrq.jl b/src/lcrq.jl index d0e5a5f..1ae19ac 100644 --- a/src/lcrq.jl +++ b/src/lcrq.jl @@ -127,6 +127,7 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x) buffer[itemindex] = x storage = UInt32(tid) # storage = _embed(UInt32, x) + @assert !iszero(storage) newslot = CRQSlot(; safe = true, index = t, storage) old = UnsafeAtomics.cas!(slotptr, slot, newslot) if old == slot @@ -163,7 +164,6 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode # before reading the type tag. (That said, currently it may # not be a problem since GC stops all tasks?) - # newslot = CRQSlot(; safe, index = h + crq.length, storage = UInt32(0)) newslot = CRQSlot(; safe, index = h + crq.length, storage = UInt32(0)) old = UnsafeAtomics.cas!(slotptr, slot, newslot) if old === slot @@ -177,7 +177,8 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode end end else # empty slot - newslot = CRQSlot(; safe, index = index + crq.length, storage) + # `max(h, index)` for dealing with tasks stalled after FAI [^maxh] + newslot = CRQSlot(; safe, index = max(h, index) + crq.length, storage) old = UnsafeAtomics.cas!(slotptr, slot, newslot) if old == slot break @@ -194,6 +195,15 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode end end end +# [^maxh]: `max(h, index)` was not mentioned in Morrison and Afek (2013) (or its +# revised version) so it's not clear if this is needed. However, it is possible +# that multiple enqueuers and/or dequeuers that would have incremented the +# `slot.index` is suspended just after the FAI. If so, `slot.index` can be a +# round (or even multiple rounds) behind the current `crq.head`. So, it seems +# like we need to prevent the enqueuing to this index (hence missing the item) +# by moving this to the next round. +# +# TODO: Is it OK to skip updating the slot if `index > h`? function fixstate!(crq::IndirectConcurrentRingQueueNode) while true @@ -286,3 +296,15 @@ function Base.show(io::IO, ::MIME"text/plain", crq::IndirectConcurrentRingQueueN status = isclosed(crq) ? "closed" : "open" print(io, "CRQ: $nitems item(s) (status: $status, head: $h, tail: $t)") end + +function Base.NamedTuple(slot::CRQSlot) + (; index, safe, storage) = slot + return (; index, safe, storage) +end + +function Base.show(io::IO, ::MIME"text/plain", slot::CRQSlot) + if get(io, :typeinfo, Any) !== typeof(slot) + show(io, MIME"text/plain"(), typeof(slot)) + end + show(io, MIME"text/plain"(), NamedTuple(slot)) +end diff --git a/test/test_crq.jl b/test/test_crq.jl new file mode 100644 index 0000000..e90899f --- /dev/null +++ b/test/test_crq.jl @@ -0,0 +1,158 @@ +module TestCRQ + +using Base.Experimental: @sync +using ConcurrentCollections +using ConcurrentCollections.Implementations: + CRQSlot, IndirectConcurrentRingQueueNode, trypush!, isclosed +using Test + +@testset "CRQSlot" begin + for index in [111, 222], + safe in [false, true], + storage in UInt32[0xaaa, 0xbbb] + + @test NamedTuple(CRQSlot(; index, safe, storage)) == (; index, safe, storage) + end +end + +function unfair_sleep(seconds::Real) + t0 = time_ns() + ns = seconds * 1e9 + while time_ns() - t0 < ns + GC.safepoint() + end +end + +function concurrent_denqueue!( + crq::IndirectConcurrentRingQueueNode, + nitems::Integer, + nsend::Integer, + nrecv::Integer, +) + @assert nsend > 0 + @assert nrecv > 0 + use_yield = nsend + nrecv > Threads.nthreads() + # TODO: If `!use_yield`, make sure the tasks are spawned in different + # threads and they are sticky. + function poll() + if use_yield + yield() + else + GC.safepoint() + end + rand() < 0.1 && unfair_sleep(rand() * 1e-6) + end + + received = Vector{Int}[] + senders = Task[] + receivers = Task[] + global SENDERS = senders + global RECEIVERS = receivers + ref = Threads.Atomic{Int}(0) + activesenders = Threads.Atomic{Int}(nsend) + @sync begin + for offset in 1:nsend + t = Threads.@spawn try + local y = nothing + local i = 0 + local allpushed = true + for outer i in offset:nsend:nitems + local x = eltype(crq)(i) + if !trypush!(crq, x) + allpushed = false + break + end + local s = Threads.atomic_add!(ref, 1) + if s > nsend + while ref[] > 1 + poll() + end + end + end + return (; y, i, allpushed) + finally + Threads.atomic_sub!(activesenders, 1) + end + push!(senders, t) + end + for _ in 1:nrecv + ys = Int[] + push!(received, ys) + t = Threads.@spawn begin + local ys_nb = Int[] + local ys_b = Int[] + while true + local y = trypopfirst!(crq) + if y === nothing + if activesenders[] == 0 + y = trypopfirst!(crq) + if y === nothing # Confirm that CRQ is empty + return y + end + # Reaching here means that there were some enqueues + # between our `trypopfirst!(crq) === nothing` and + # `activesenders[] == 0`. + else + poll() + continue + end + end + local i = something(y) + push!(ys, i) + + local s = Threads.atomic_sub!(ref, 1) + if s < -nrecv + while ref[] < -1 + poll() + end + end + end + end + push!(receivers, t) + end + end + return received, senders, receivers +end + +function check_consecutive(xs) + notfound = Int[] + dups = Int[] + pre = xs[begin] - 1 + for x in xs + e = pre + 1 + append!(notfound, e:x-1) + append!(dups, x:e-1) + pre = x + end + return (; notfound, dups) +end + +@testset "concurrent push-pop" begin + @testset for trial in 1:100 + global received, notfound, dups, allreceived + nsend = cld(Threads.nthreads(), 2) + nrecv = max(1, Threads.nthreads() - nsend) + crq = IndirectConcurrentRingQueueNode{Int16}(32) + global CRQ = crq + nitems = 2^20 + nitems = typemax(Int16) + received, senders, receivers = concurrent_denqueue!(crq, nitems, nsend, nrecv) + allreceived = reduce(vcat, received) + + if isclosed(crq) + @info "CRQ closed. Skipping the tests..." + continue + end + + @test [fetch(t).allpushed for t in senders] == fill(true, nsend) + + sort!(allreceived) + (; notfound, dups) = check_consecutive(allreceived) + @test length(allreceived) == nitems + @test notfound == [] + @test dups == [] + @test allreceived == 1:nitems + end +end + +end # module diff --git a/test/test_lcrq.jl b/test/test_lcrq.jl index 9ca75df..3bc000a 100644 --- a/test/test_lcrq.jl +++ b/test/test_lcrq.jl @@ -73,7 +73,6 @@ end @testset "concurrent push-pop" begin if Threads.nthreads() > 1 @testset for trial in 1:100 - # @show trial nsend = cld(Threads.nthreads(), 2) nrecv = Threads.nthreads() - nsend @assert nsend ≥ 1