Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/dlcrq.jl
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ const SmallUnsigned = Union{Bool,UInt8,UInt16}
const NullableInline32 = Union{SmallSigned,SmallUnsigned}

function IndirectMultiPolarityConcurrentRingQueueNode{T}(log2len::Int) where {T}
if !(0 <= log2len <= ITEMINDEX_BITS32)
error("Expected: 0 <= log2len <= $ITEMINDEX_BITS32; Got log2len = $log2len")
end
len = 2^log2len
if T <: NullableInline32 && isconcretetype(T)
data = nothing
Expand Down
114 changes: 42 additions & 72 deletions src/lcrq.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,6 @@ bool(x::Int32Bool) = x.bits > 0
setbool(x::Int32Bool, b::Bool) = Int32Bool(int(x), b)
setint(x::Int32Bool, i::Int32) = Int32Bool(i, bool(x))

struct ICRQIndex
bits::UInt32
end

const THREADINDEX_BITS32 = 8
const ITEMINDEX_BITS32 = 32 - THREADINDEX_BITS32

function ICRQIndex(; threadindex, itemindex)
local bits::UInt32
# threadindex ≥ 1 << THREADINDEX_BITS32 && error("threadindex too large $threadindex")
# itemindex ≥ 1 << ITEMINDEX_BITS32 && error("itemindex too large $itemindex")
bits = UInt32(threadindex) << ITEMINDEX_BITS32
bits |= UInt32(itemindex) & (typemax(UInt32) >> THREADINDEX_BITS32)
return ICRQIndex(bits)
end

@inline function Base.getproperty(idx::ICRQIndex, name::Symbol)
if name === :threadindex
bits = getfield(idx, :bits)
return (bits >> (32 - 8)) % Int32
elseif name === :itemindex
bits = getfield(idx, :bits)
return (bits & (typemax(UInt32) >> 8)) % Int32
else
return getfield(idx, name)
end
end

struct CRQSlot{S}
index_safe::Int32Bool
storage::S # sizeof(S) ≤ 4
Expand Down Expand Up @@ -74,17 +46,15 @@ mutable struct IndirectConcurrentRingQueueNode{T}
@atomic tail_closed::Int32
_tail_pad::PadAfter32
@atomic next::Union{Nothing,IndirectConcurrentRingQueueNode{T}}
ring::Vector{CRQSlot{ICRQIndex}}
ring::Vector{CRQSlot{UInt32}}
length::Int
buffers::Vector{Vector{T}}
buffertails::Vector{Int}
end
# TODO: pad

function IndirectConcurrentRingQueueNode{T}(len::Int) where {T}
buffers = [Vector{T}(undef, 2len) for _ in 1:Threads.nthreads()]
buffertails = fill(1, Threads.nthreads())
ring = [CRQSlot(Int32Bool(Int32(i), true), ICRQIndex(0)) for i in 1:len]
buffers = [Vector{T}(undef, len) for _ in 1:Threads.nthreads()]
ring = [CRQSlot(Int32Bool(Int32(i), true), UInt32(0)) for i in 1:len]
head = Int32(1)
tail_closed = Int32(1)
return IndirectConcurrentRingQueueNode(
Expand All @@ -96,7 +66,6 @@ function IndirectConcurrentRingQueueNode{T}(len::Int) where {T}
ring,
len,
buffers,
buffertails,
)
end

Expand Down Expand Up @@ -136,7 +105,7 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x)

starvation_ctr = 0
while true
t = @atomic crq.tail_closed += true
t = (@atomic crq.tail_closed += true) - true
if isclosed(crq, t)
return false # closed
elseif t ≥ typemax(Int32) - Threads.nthreads()
Expand All @@ -146,21 +115,22 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x)
_close(crq, t)
return false # closed
end
slotptr = pointer(crq.ring, mod1(t, crq.length))
slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex}
# TODO: use shift like DLCRQ
itemindex = mod1(t, crq.length)
slotptr = pointer(crq.ring, itemindex)
slot = UnsafeAtomics.load(slotptr)::CRQSlot{UInt32}
(; index, safe, storage) = slot
if iszero(storage.bits)
if iszero(storage)
if (index ≤ t) && (safe || (@atomic crq.head) ≤ t)
tid = Threads.threadid()
buffer = crq.buffers[tid]
localtail = crq.buffertails[tid] + 1
itemindex = mod1(localtail, length(buffer))
buffer[itemindex] = x # [^itemindex]
bidx = ICRQIndex(; threadindex = tid, itemindex)
newslot = CRQSlot(; safe = true, index = t, storage = bidx)
buffer[itemindex] = x
storage = UInt32(tid)
# storage = _embed(UInt32, x)
@assert !iszero(storage)
newslot = CRQSlot(; safe = true, index = t, storage)
old = UnsafeAtomics.cas!(slotptr, slot, newslot)
if old == slot
crq.buffertails[tid] = localtail
return true
end
end
Expand All @@ -173,36 +143,28 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x)
GC.safepoint() # must not yield
end
end
# [^itemindex]: `buffer[itemindex] = x` is OK since there is no other thread
# that can write to this memory location. Using `length(ring) ==
# length(buffer)` is OK since at this point we know that there is at least one
# empty slot and only other threads can make the `ring` full. However, other
# threads need to put the item in their own buffer.
# TODO: So, 2len -> len
# TODO: This reasoning was wrong in the Dual LCRQ. It may be due to calls to
# denqueue stalled right after FAI (handled with the `safe` bit). Check if
# something similar is possible in LCRQ and stop using `ICRQIndex` if so.

function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode)
while true
h = @atomic crq.head += true
slotptr = pointer(crq.ring, mod1(h, crq.length))
h = (@atomic crq.head += true) - true
itemindex = mod1(h, crq.length) # TODO: shift
slotptr = pointer(crq.ring, itemindex)
while true
slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex}
slot = UnsafeAtomics.load(slotptr)::CRQSlot{UInt32}
(; index, safe, storage) = slot
if !iszero(storage.bits)
if !iszero(storage)
if index == h
(; threadindex, itemindex) = storage
threadindex = storage
x = crq.buffers[threadindex][itemindex]
# x = _extract(eltype(crq), storage)
# Above load requires "tearable atomics" for immutables; for
# boxed Julia objects (i.e., pointer), it's probably
# equivalent to asking the compiler to not root `x` until
# the CAS succeed so that there is an acquire ordering
# before reading the type tag. (That said, currently it may
# not be a problem since GC stops all tasks?)

newslot =
CRQSlot(; safe, index = h + crq.length, storage = ICRQIndex(0))
newslot = CRQSlot(; safe, index = h + crq.length, storage = UInt32(0))
old = UnsafeAtomics.cas!(slotptr, slot, newslot)
if old === slot
return Some{eltype(crq)}(x)
Expand All @@ -215,7 +177,8 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode
end
end
else # empty slot
newslot = CRQSlot(; safe, index = index + crq.length, storage)
# `max(h, index)` for dealing with tasks stalled after FAI [^maxh]
newslot = CRQSlot(; safe, index = max(h, index) + crq.length, storage)
old = UnsafeAtomics.cas!(slotptr, slot, newslot)
if old == slot
break
Expand All @@ -232,6 +195,15 @@ function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode
end
end
end
# [^maxh]: `max(h, index)` was not mentioned in Morrison and Afek (2013) (or its
# revised version) so it's not clear if this is needed. However, it is possible
# that multiple enqueuers and/or dequeuers that would have incremented the
# `slot.index` is suspended just after the FAI. If so, `slot.index` can be a
# round (or even multiple rounds) behind the current `crq.head`. So, it seems
# like we need to prevent the enqueuing to this index (hence missing the item)
# by moving this to the next round.
#
# TODO: Is it OK to skip updating the slot if `index > h`?

function fixstate!(crq::IndirectConcurrentRingQueueNode)
while true
Expand Down Expand Up @@ -325,16 +297,14 @@ function Base.show(io::IO, ::MIME"text/plain", crq::IndirectConcurrentRingQueueN
print(io, "CRQ: $nitems item(s) (status: $status, head: $h, tail: $t)")
end

function Base.show(io::IO, ::MIME"text/plain", index::ICRQIndex)
if get(io, :typeinfo, Any) !== typeof(index)
invoke(show, Tuple{IO,MIME"text/plain",Any}, io, MIME"text/plain"(), index)
return
end
if iszero(index.bits)
print(io, "#null")
return
function Base.NamedTuple(slot::CRQSlot)
(; index, safe, storage) = slot
return (; index, safe, storage)
end

function Base.show(io::IO, ::MIME"text/plain", slot::CRQSlot)
if get(io, :typeinfo, Any) !== typeof(slot)
show(io, MIME"text/plain"(), typeof(slot))
end
(; threadindex, itemindex) = index
nt = (; threadindex, itemindex)
show(io, MIME"text/plain"(), nt)
show(io, MIME"text/plain"(), NamedTuple(slot))
end
158 changes: 158 additions & 0 deletions test/test_crq.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
module TestCRQ

using Base.Experimental: @sync
using ConcurrentCollections
using ConcurrentCollections.Implementations:
CRQSlot, IndirectConcurrentRingQueueNode, trypush!, isclosed
using Test

@testset "CRQSlot" begin
for index in [111, 222],
safe in [false, true],
storage in UInt32[0xaaa, 0xbbb]

@test NamedTuple(CRQSlot(; index, safe, storage)) == (; index, safe, storage)
end
end

function unfair_sleep(seconds::Real)
t0 = time_ns()
ns = seconds * 1e9
while time_ns() - t0 < ns
GC.safepoint()
end
end

function concurrent_denqueue!(
crq::IndirectConcurrentRingQueueNode,
nitems::Integer,
nsend::Integer,
nrecv::Integer,
)
@assert nsend > 0
@assert nrecv > 0
use_yield = nsend + nrecv > Threads.nthreads()
# TODO: If `!use_yield`, make sure the tasks are spawned in different
# threads and they are sticky.
function poll()
if use_yield
yield()
else
GC.safepoint()
end
rand() < 0.1 && unfair_sleep(rand() * 1e-6)
end

received = Vector{Int}[]
senders = Task[]
receivers = Task[]
global SENDERS = senders
global RECEIVERS = receivers
ref = Threads.Atomic{Int}(0)
activesenders = Threads.Atomic{Int}(nsend)
@sync begin
for offset in 1:nsend
t = Threads.@spawn try
local y = nothing
local i = 0
local allpushed = true
for outer i in offset:nsend:nitems
local x = eltype(crq)(i)
if !trypush!(crq, x)
allpushed = false
break
end
local s = Threads.atomic_add!(ref, 1)
if s > nsend
while ref[] > 1
poll()
end
end
end
return (; y, i, allpushed)
finally
Threads.atomic_sub!(activesenders, 1)
end
push!(senders, t)
end
for _ in 1:nrecv
ys = Int[]
push!(received, ys)
t = Threads.@spawn begin
local ys_nb = Int[]
local ys_b = Int[]
while true
local y = trypopfirst!(crq)
if y === nothing
if activesenders[] == 0
y = trypopfirst!(crq)
if y === nothing # Confirm that CRQ is empty
return y
end
# Reaching here means that there were some enqueues
# between our `trypopfirst!(crq) === nothing` and
# `activesenders[] == 0`.
else
poll()
continue
end
end
local i = something(y)
push!(ys, i)

local s = Threads.atomic_sub!(ref, 1)
if s < -nrecv
while ref[] < -1
poll()
end
end
end
end
push!(receivers, t)
end
end
return received, senders, receivers
end

function check_consecutive(xs)
notfound = Int[]
dups = Int[]
pre = xs[begin] - 1
for x in xs
e = pre + 1
append!(notfound, e:x-1)
append!(dups, x:e-1)
pre = x
end
return (; notfound, dups)
end

@testset "concurrent push-pop" begin
@testset for trial in 1:100
global received, notfound, dups, allreceived
nsend = cld(Threads.nthreads(), 2)
nrecv = max(1, Threads.nthreads() - nsend)
crq = IndirectConcurrentRingQueueNode{Int16}(32)
global CRQ = crq
nitems = 2^20
nitems = typemax(Int16)
received, senders, receivers = concurrent_denqueue!(crq, nitems, nsend, nrecv)
allreceived = reduce(vcat, received)

if isclosed(crq)
@info "CRQ closed. Skipping the tests..."
continue
end

@test [fetch(t).allpushed for t in senders] == fill(true, nsend)

sort!(allreceived)
(; notfound, dups) = check_consecutive(allreceived)
@test length(allreceived) == nitems
@test notfound == []
@test dups == []
@test allreceived == 1:nitems
end
end

end # module
Loading