diff --git a/src/dlcrq.jl b/src/dlcrq.jl index 196a0e2..753cf6f 100644 --- a/src/dlcrq.jl +++ b/src/dlcrq.jl @@ -25,6 +25,19 @@ Waiter{T}(task::Task = current_task()) where {T} = Waiter{T}( # PadAfter64(), ) +function reinit!(w::Waiter) + if assertion_enabled() + state = @atomic w.state + @assert state in (WAITER_TRASHED, WAITER_FINISHED) + @assert w.task === nothing + @assert w.value === nothing + end + w.task = current_task() + w.value === nothing + @atomic :monotonic w.state = WAITER_INIT + return w +end + function Base.fetch(w::Waiter{T}; nspins::Integer = 0) where {T} for _ in 1:nspins if (@atomic w.state) == WAITER_SATISFIED @@ -462,7 +475,8 @@ mutable struct DualLinkedConcurrentRingQueue{ _data_pad::PadAfter64 @atomic antidata::CRQ _antidata_pad::PadAfter64 - cache::ThreadLocalCache{CRQ} + crqcache::ThreadLocalCache{CRQ} + waitercache::ThreadLocalCache{Waiter{T}} end DualLinkedConcurrentRingQueue(; kwargs...) = DualLinkedConcurrentRingQueue{Any}(; kwargs...) @@ -475,6 +489,7 @@ function DualLinkedConcurrentRingQueue{T}(; log2ringsize = 11) where {T} node, PadAfter64(), ThreadLocalCache{typeof(node)}(), + ThreadLocalCache{Waiter{T}}(), )::DualLinkedConcurrentRingQueue{T} end @@ -492,14 +507,22 @@ function Base.push!(lcrq::DualLinkedConcurrentRingQueue{T}, x) where {T} end function Base.popfirst!(lcrq::DualLinkedConcurrentRingQueue{T}) where {T} - w = Waiter{eltype(lcrq)}() + w = let cached = maybepop!(lcrq.waitercache) + if cached === nothing + Waiter{eltype(lcrq)}() + else + reinit!(something(cached)) + end + end y = denqueue!(lcrq, w) if y === MPCRQ_ENQUEUED - return fetch(w) + x = fetch(w) else trash!(w) - return something(y::Some{T}) + x = something(y::Some{T}) end + trypush!(lcrq.waitercache, w) + return x::T end # const CLOSED_X = Vector{Int}[Int[] for _ in 1:Threads.nthreads()] @@ -550,8 +573,8 @@ function denqueue!(lcrq::DualLinkedConcurrentRingQueue{T}, x::Union{T,Waiter{T}} end end -function make_newcrq!(lcrq, crq) - oldcrq = maybepop!(lcrq.cache) +function make_newcrq!(lcrq::DualLinkedConcurrentRingQueue, crq) + oldcrq = maybepop!(lcrq.crqcache) if oldcrq === nothing return similar(crq) else @@ -560,7 +583,7 @@ function make_newcrq!(lcrq, crq) end function cache_crq!(lcrq::DualLinkedConcurrentRingQueue, crq) - trypush!(lcrq.cache, empty!(crq)) + trypush!(lcrq.crqcache, empty!(crq)) return end diff --git a/src/utils.jl b/src/utils.jl index 332b7b5..1653d0c 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -2,6 +2,7 @@ !=′(x::T, y::T) where {T} = x != y assertion_enabled() = false +# assertion_enabled() = true @noinline unreachable() = error("unreachable reached")