From 1b68abfa30605b12c545c988b0f90f82a0cf9b1f Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Tue, 28 Sep 2021 19:41:18 -0400 Subject: [PATCH 1/3] Implement Scherer & Scott dual queue --- .../src/bench_queue_hot_potato.jl | 1 + src/ConcurrentCollections.jl | 3 + src/msqueue.jl | 1 + src/ssqueue.jl | 257 ++++++++++++++++++ .../src/ConcurrentCollectionsTests.jl | 1 + .../src/test_dlcrq.jl | 9 +- .../src/test_ssqueue.jl | 48 ++++ 7 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 src/ssqueue.jl create mode 100644 test/ConcurrentCollectionsTests/src/test_ssqueue.jl diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl index fe96f0c..36e8cc7 100644 --- a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl @@ -133,6 +133,7 @@ function setup(; kwargs...) suite["channel"] = @benchmarkable hotpotato!(Channel{Bool}(Inf); $kwargs...) suite["dlcrq"] = @benchmarkable hotpotato!(DualLinkedConcurrentRingQueue{Bool}(); $kwargs...) + suite["dlq"] = @benchmarkable hotpotato!(DualLinkedQueue{Bool}(); $kwargs...) return suite end diff --git a/src/ConcurrentCollections.jl b/src/ConcurrentCollections.jl index 24a0d11..80a1f46 100644 --- a/src/ConcurrentCollections.jl +++ b/src/ConcurrentCollections.jl @@ -7,6 +7,7 @@ export ConcurrentStack, Delete, DualLinkedConcurrentRingQueue, + DualLinkedQueue, Keep, LinkedConcurrentRingQueue, WorkStealingDeque, @@ -70,6 +71,7 @@ include("msqueue.jl") include("stack.jl") include("lcrq.jl") include("dlcrq.jl") +include("ssqueue.jl") include("misc.jl") end # module Implementations @@ -78,6 +80,7 @@ using .Implementations: ConcurrentQueue, ConcurrentStack, DualLinkedConcurrentRingQueue, + DualLinkedQueue, LinkedConcurrentRingQueue, WorkStealingDeque diff --git a/src/msqueue.jl b/src/msqueue.jl index 5995af3..3cbd6a1 100644 --- a/src/msqueue.jl +++ b/src/msqueue.jl @@ -6,6 +6,7 @@ mutable struct MSQNode{T} MSQNode{T}(next::Union{MSQNode{T},Nothing}, value::T) where {T} = new{T}(next, value) end +# TODO: rename it to ConcurrentLinkedQueue? mutable struct ConcurrentQueue{T} @atomic head::MSQNode{T} @atomic tail::MSQNode{T} diff --git a/src/ssqueue.jl b/src/ssqueue.jl new file mode 100644 index 0000000..13e37b4 --- /dev/null +++ b/src/ssqueue.jl @@ -0,0 +1,257 @@ +struct IsData end +struct IsAntiData end +const PolarityTrait = Union{IsData,IsAntiData} + +Base.adjoint(::IsData) = IsAntiData() +Base.adjoint(::IsAntiData) = IsData() + +abstract type AbstractSSQNode{T} end + +mutable struct SSQDataNode{T} <: AbstractSSQNode{T} + @atomic next::Union{AbstractSSQNode{T},Nothing} + value::T + + SSQDataNode{T}() where {T} = new{T}(nothing) + SSQDataNode{T}(next::Union{AbstractSSQNode{T},Nothing}, value::T) where {T} = + new{T}(next, value) +end + +mutable struct SSQWaiterNode{T} <: AbstractSSQNode{T} + @atomic next::Union{AbstractSSQNode{T},Nothing} + value::Waiter{T} + + SSQWaiterNode{T}() where {T} = new{T}(nothing) + SSQWaiterNode{T}(next::Union{AbstractSSQNode{T},Nothing}, value::Waiter{T}) where {T} = + new{T}(next, value) +end + +const SSQNode{T} = Union{SSQDataNode{T},SSQWaiterNode{T}} + +polarityof(::SSQDataNode) = IsData +polarityof(::SSQWaiterNode) = IsAntiData +ssqnodetype(::IsData, ::Type{T}) where {T} = SSQDataNode{T} +ssqnodetype(::IsAntiData, ::Type{T}) where {T} = SSQWaiterNode{T} + +#= +mutable struct SSQNode{Polarity<:PolarityTrait,T,V<:Union{T,Waiter{T}}} + @atomic next::Union{SSQNode{IsData,T,T},SSQNode{IsAntiData,T,Waiter{T}},Nothing} + value::V + + SSQNode{Polarity,T}() where {Polarity,T} = new{Polarity,T,vtype(Polarity(), T)}(nothing) + function SSQNode{Polarity,T}( + next::Union{SSQNode{<:Any,T},Nothing}, + value::Union{T,Waiter{T}}, + ) where {Polarity,T} + V = vtype(Polarity(), T) + value = value::V + return new{Polarity,T,V}(next, value) + end +end + +@inline SSQNode{Polarity,T,V}( + next::Union{SSQNode{<:Any,T},Nothing}, + value::Union{T,Waiter{T}}, +) where {Polarity,T,V} = SSQNode{Polarity,T}(next, value)::SSQNode{Polarity,T,V} + +polarityof(::SSQNode{Polarity}) where {Polarity} = Polarity +=# + +mutable struct DualLinkedQueue{T} + @atomic head::SSQNode{T} + @atomic tail::SSQNode{T} +end + +Base.eltype(::Type{DualLinkedQueue{T}}) where {T} = T + +DualLinkedQueue() = DualLinkedQueue{Any}() +function DualLinkedQueue{T}() where {T} + n = SSQDataNode{T}() + return DualLinkedQueue{T}(n, n) +end + +function denqueue!( + queue::DualLinkedQueue{T}, + x::Union{T,Waiter{T}}, + polarity::PolarityTrait, +) where {T} + local node::Union{Nothing,SSQNode{T}} = nothing + + head = @atomic queue.head + tail = @atomic queue.tail + while true + next = (@atomic head.next)::Union{Nothing,SSQNode{T}} + head′ = @atomic queue.head + if head !== head′ # snapshot failed + head = head′ + tail = @atomic queue.tail + continue + end + if head === tail + last = next + should_enqueue = true + elseif polarity isa polarityof(next) + last = @atomic tail.next + should_enqueue = true + else + last = nothing + should_enqueue = false + end + + if should_enqueue + tail′ = @atomic queue.tail + if tail′ !== tail # snapshot failed + tail = tail′ + continue + end + if last !== nothing + old, ok = @atomicreplace(queue.tail, tail => last) + tail = ok ? next : old + continue + end + node = if node === nothing + ssqnodetype(polarity, T)(nothing, x) + else + node + end::ssqnodetype(polarity, T) + last, ok = @atomicreplace(tail.next, nothing => node) + if ok + @atomicreplace(queue.tail, tail => node) + return nothing + end + last = last::SSQNode{T} # can be any polarity + old, ok = @atomicreplace(queue.tail, tail => last) + tail = ok ? last : old + else + next = next::ssqnodetype(polarity', T) + value = next.value + head, ok = @atomicreplace(queue.head, head => next) + if ok + return Some(value) + end + tail = @atomic queue.tail + end + end +end + +function Base.push!(queue::DualLinkedQueue{T}, x) where {T} + x = convert(T, x) + while true + y = denqueue!(queue, x, IsData()) + if y isa Some + w = something(y)::Waiter{T} + tryput!(w, x) || continue + else + y::Nothing + end + return queue + end +end + +function Base.popfirst!(queue::DualLinkedQueue{T}) where {T} + # TODO: cache Waiter + w = Waiter{T}() + y = denqueue!(queue, w, IsAntiData()) + if y isa Some + return something(y) + else + y::Nothing + x = fetch(w) + return x::T + end +end + +Base.IteratorSize(::Type{<:DualLinkedQueue}) = Base.SizeUnknown() + +function Base.iterate(queue::DualLinkedQueue{T}) where {T} + head = @atomic queue.head + node = @atomic head.next + if node === nothing + return nothing + else + if node isa SSQWaiterNode + return nothing + else + return (node.value, node) + end + end +end + +function Base.iterate(::DualLinkedQueue{T}, prev::SSQDataNode) where {T} + node = (@atomic prev.next)::Union{Nothing,SSQDataNode} + if node === nothing + return nothing + else + return (node.value, node) + end +end + +struct NodeIterator{T} + queue::T +end + +Base.IteratorSize(::Type{<:NodeIterator}) = Base.SizeUnknown() + +Base.eltype(::Type{NodeIterator{<:DualLinkedQueue{T}}}) where {T} = SSQNode{T} + +function Base.iterate( + iter::NodeIterator{DualLinkedQueue{T}}, + prev::SSQNode{T} = let queue = iter.queue + @atomic queue.head + end, +) where {T} + node = (@atomic prev.next)::Union{Nothing,SSQNode{T}} + if node === nothing + return nothing + else + return (node, node) + end +end + +function check_invariance(queue::DualLinkedQueue{T}) where {T} + isdata = nothing + for node in NodeIterator(queue) + if isdata === nothing + isdata = node isa SSQDataNode + elseif isdata !=′ (node isa SSQDataNode) + return false + end + end + return true +end + +function summaryinfo(queue::DualLinkedQueue{T}) where {T} + counter = Ref(0) + isdata = Ref(true) + for node in NodeIterator(queue) + isdata[] = !(node isa SSQWaiterNode) + counter[] += 1 + end + return (; nitems = counter[], isdata = isdata[]) +end + +function Base.summary(io::IO, queue::DualLinkedQueue) + show(io, MIME"text/plain"(), typeof(queue)) + nitems, isdata = summaryinfo(queue) + s = nitems > 1 ? "s" : "" + print(io, " with ", nitems, isdata ? " data item$s" : " waiter$s") +end + +function Base.show(io::IO, ::MIME"text/plain", queue::DualLinkedQueue) + summary(io, queue) + eio = IOContext(io, :typeinfo => eltype(queue), :limit => true, :compact => true) + n = 0 + for x in queue + if n == 0 + println(io, ":") + else + println(io) + end + n += 1 + if n > 3 + print(io, " ⋮") + return + end + print(io, " ") + show(eio, MIME"text/plain"(), x) + end +end diff --git a/test/ConcurrentCollectionsTests/src/ConcurrentCollectionsTests.jl b/test/ConcurrentCollectionsTests/src/ConcurrentCollectionsTests.jl index 868db06..c5ad63f 100644 --- a/test/ConcurrentCollectionsTests/src/ConcurrentCollectionsTests.jl +++ b/test/ConcurrentCollectionsTests/src/ConcurrentCollectionsTests.jl @@ -9,6 +9,7 @@ include("test_doctest.jl") include("test_lcrq.jl") include("test_mpcrq.jl") include("test_msqueue.jl") +include("test_ssqueue.jl") include("test_tsstack.jl") include("test_work_stealing_deque.jl") diff --git a/test/ConcurrentCollectionsTests/src/test_dlcrq.jl b/test/ConcurrentCollectionsTests/src/test_dlcrq.jl index 95c816e..e20d2c0 100644 --- a/test/ConcurrentCollectionsTests/src/test_dlcrq.jl +++ b/test/ConcurrentCollectionsTests/src/test_dlcrq.jl @@ -149,17 +149,16 @@ function test_concurrent_push_pop(ntrials = 100) @withprogress name = "concurrent push-pop" begin @testset for trial in 1:ntrials @logprogress (trial - 1) / ntrials - check_concurrent_push_pop() + q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 5) + # q = Channel{Int}(Inf) + check_concurrent_push_pop!(q) end end end -function check_concurrent_push_pop() +function check_concurrent_push_pop!(q; nitems = 2^20) nsend = cld(Threads.nthreads(), 2) nrecv = max(1, Threads.nthreads() - nsend) - q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 5) - # q = Channel{Int}(Inf) - nitems = 2^20 received = concurrent_push_pop!(q, nitems, nsend, nrecv) allreceived = reduce(vcat, received) sort!(allreceived) diff --git a/test/ConcurrentCollectionsTests/src/test_ssqueue.jl b/test/ConcurrentCollectionsTests/src/test_ssqueue.jl new file mode 100644 index 0000000..45d8f54 --- /dev/null +++ b/test/ConcurrentCollectionsTests/src/test_ssqueue.jl @@ -0,0 +1,48 @@ +module TestSSQueue + +using ConcurrentCollections +using ConcurrentCollections.Implementations: NodeIterator, check_invariance +using ProgressLogging: @logprogress, @withprogress +using Test +using ..TestDLCRQ: check_concurrent_push_pop! + +function test_push_pop_once_int() + q = DualLinkedQueue{Int}() + push!(q, 111) + @test popfirst!(q) == 111 +end + +function test_push_pop_once_any() + q = DualLinkedQueue() + push!(q, 111) + @test popfirst!(q) == 111 +end + +function test_error() + q = DualLinkedQueue{Int}() + t = @task popfirst!(q) + yield(t) + msg = "Interrupting DLQ @$(time_ns())" + schedule(t, ErrorException(msg); error = true) + err = try + Some(wait(t)) + catch e + e + end + @test err isa TaskFailedException + @test occursin(msg, sprint(showerror, err)) + push!(q, 111) # this dequeues but ignores the interrupted waiter + @test popfirst!(q) == 111 +end + +function test_concurrent_push_pop(ntrials = 100) + @withprogress name = "concurrent push-pop" begin + @testset for trial in 1:ntrials + @logprogress (trial - 1) / ntrials + q = DualLinkedQueue{Int}() + check_concurrent_push_pop!(q; nitems = 2^13) + end + end +end + +end # module From 6bd90b3310c91e8fe29cc198acf82828eadc4c35 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Tue, 28 Sep 2021 19:57:11 -0400 Subject: [PATCH 2/3] Document DualLinkedQueue --- README.md | 1 + src/docs/DualLinkedConcurrentRingQueue.md | 2 +- src/docs/DualLinkedQueue.md | 37 +++++++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 src/docs/DualLinkedQueue.md diff --git a/README.md b/README.md index b2fb4d6..f53a292 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ ConcurrentCollections.jl provides the following lock-free collections for Julia ≥ 1.7: * [`DualLinkedConcurrentRingQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.DualLinkedConcurrentRingQueue) +* [`DualLinkedQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.DualLinkedQueue) * [`LinkedConcurrentRingQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.LinkedConcurrentRingQueue) * [`ConcurrentQueue`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.ConcurrentQueue) * [`ConcurrentStack`](https://juliaconcurrent.github.io/ConcurrentCollections.jl/dev/#ConcurrentCollections.ConcurrentStack) diff --git a/src/docs/DualLinkedConcurrentRingQueue.md b/src/docs/DualLinkedConcurrentRingQueue.md index 9ddd8f7..f75a88d 100644 --- a/src/docs/DualLinkedConcurrentRingQueue.md +++ b/src/docs/DualLinkedConcurrentRingQueue.md @@ -3,7 +3,7 @@ A concurrent queue with "almost" nonblocking `push!` and `popfirst!`. Calling `popfirst!` on an empty queue waits for a `push!` in another task. -See also: [`LinkedConcurrentRingQueue`](@ref) +See also: [`LinkedConcurrentRingQueue`](@ref), [`DualLinkedQueue`](@ref) # Examples ```julia diff --git a/src/docs/DualLinkedQueue.md b/src/docs/DualLinkedQueue.md new file mode 100644 index 0000000..9ecbdb5 --- /dev/null +++ b/src/docs/DualLinkedQueue.md @@ -0,0 +1,37 @@ + DualLinkedQueue{T}() + +A concurrent queue with nonblocking `push!` and `popfirst!`. Calling +`popfirst!` on an empty queue waits for a `push!` in another task. + +[`DualLinkedConcurrentRingQueue`](@ref) provides a faster dual queue with a +larger memory footprint. + +# Examples +```julia +julia> using ConcurrentCollections + +julia> q = DualLinkedQueue{Int}(); + +julia> push!(q, 111); + +julia> push!(q, 222); + +julia> popfirst!(q) # first-in first-out +111 + +julia> popfirst!(q) +222 +``` + +# Extended help + +Since `popfirst!` blocks when called on an empty queue, a `DualLinkedQueue` acts +almost like an unbounded `Base.Channel`. However, `DualLinkedQueue` does not +support `close` or blocking on `push!` when exceeding a bound. + +`DualLinkedQueue` implements the dual queue by Scherer and Scott (2004): + +> Scherer, William N., and Michael L. Scott. “Nonblocking Concurrent Data +> Structures with Condition Synchronization.” In Distributed Computing, edited +> by Rachid Guerraoui, 174–87. Lecture Notes in Computer Science. Berlin, +> Heidelberg: Springer, 2004. . From 54e18edf15b9cbfb00140c054af0799785ff14aa Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Tue, 28 Sep 2021 20:12:13 -0400 Subject: [PATCH 3/3] List DualLinkedQueue in docs --- docs/src/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/src/index.md b/docs/src/index.md index b365082..dc7f7b5 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -7,6 +7,7 @@ ```@docs DualLinkedConcurrentRingQueue +DualLinkedQueue LinkedConcurrentRingQueue ConcurrentQueue ConcurrentStack