diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl index 1465c1b..4028201 100644 --- a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl @@ -24,7 +24,7 @@ function pushpop!( while true y = nothing for _ in 1:nspins - y = trypopfirst!(q) + y = maybepopfirst!(q) y === nothing || break end y === nothing || break diff --git a/docs/src/index.md b/docs/src/index.md index dc7f7b5..6d8fc99 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -12,8 +12,8 @@ LinkedConcurrentRingQueue ConcurrentQueue ConcurrentStack WorkStealingDeque -trypop! -trypopfirst! +maybepop! +maybepopfirst! ``` ## Hash table @@ -21,7 +21,7 @@ trypopfirst! ```@docs ConcurrentDict modify! -tryget +maybeget Keep Delete ``` diff --git a/src/ConcurrentCollections.jl b/src/ConcurrentCollections.jl index 80a1f46..3e5c2e1 100644 --- a/src/ConcurrentCollections.jl +++ b/src/ConcurrentCollections.jl @@ -14,9 +14,9 @@ export length_upper_bound, length_upper_bound, modify!, - tryget, - trypop!, - trypopfirst! + maybeget, + maybepop!, + maybepopfirst! import Base @@ -31,9 +31,9 @@ end abstract type ConcurrentDict{Key,Value} <: Base.AbstractDict{Key,Value} end function modify! end -function trypop! end -function trypopfirst! end -function tryget end +function maybepop! end +function maybepopfirst! end +function maybeget end function length_lower_bound end function length_upper_bound end @@ -54,9 +54,9 @@ using ..ConcurrentCollections: length_lower_bound, length_upper_bound, modify!, - tryget, - trypop!, - trypopfirst! + maybeget, + maybepop!, + maybepopfirst! include("UnsafeAtomics.jl") using .UnsafeAtomics: acq_rel, acquire, monotonic, release, seq_cst, unordered diff --git a/src/cache.jl b/src/cache.jl index 2be9fe6..ac01e11 100644 --- a/src/cache.jl +++ b/src/cache.jl @@ -8,7 +8,7 @@ function ThreadLocalCache{T}(; size::Integer = 4) where {T} return ThreadLocalCache(cache, size) end -function maybepop!(cache::ThreadLocalCache) +function _maybepop!(cache::ThreadLocalCache) buffer = cache.cache[Threads.threadid()] if isempty(buffer) return nothing diff --git a/src/dict.jl b/src/dict.jl index bb0724a..05716d6 100644 --- a/src/dict.jl +++ b/src/dict.jl @@ -113,7 +113,7 @@ end @inline Base.getindex(ref::ValueRef) = ref.value function Base.getindex(d::LinearProbingDict{Key}, key) where {Key} - y = tryget(d, key) + y = maybeget(d, key) if y === nothing throw(KeyError(key)) else @@ -129,9 +129,9 @@ function Base.haskey(d::LinearProbingDict, key) end Base.get(d::LinearProbingDict, key, default) = - something(ConcurrentCollections.tryget(d, key), default) + something(ConcurrentCollections.maybeget(d, key), default) -function ConcurrentCollections.tryget(d::LinearProbingDict{<:Any,V}, key) where {V} +function ConcurrentCollections.maybeget(d::LinearProbingDict{<:Any,V}, key) where {V} @inline f(::Nothing) = nothing @inline f(x) = Keep(x[]) y = modify!(f, d, key) @@ -149,14 +149,14 @@ function Base.setindex!(d::LinearProbingDict{Key,Value}, v, k) where {Key,Value} return d end -Base.pop!(d::LinearProbingDict, key, default) = something(trypop!(d, key), default) +Base.pop!(d::LinearProbingDict, key, default) = something(maybepop!(d, key), default) function Base.pop!(d::LinearProbingDict, key) - value = trypop!(d, key) + value = maybepop!(d, key) value === nothing && throw(KeyError(key)) return something(value) end -function ConcurrentCollections.trypop!(d::LinearProbingDict, key) +function ConcurrentCollections.maybepop!(d::LinearProbingDict, key) @inline f(::Nothing) = nothing @inline f(ref) = Delete(ref[]) y = modify!(f, d, key) diff --git a/src/dlcrq.jl b/src/dlcrq.jl index 19f7e29..14bc713 100644 --- a/src/dlcrq.jl +++ b/src/dlcrq.jl @@ -533,7 +533,7 @@ function Base.push!(lcrq::DualLinkedConcurrentRingQueue{T}, x) where {T} end function Base.popfirst!(lcrq::DualLinkedConcurrentRingQueue{T}) where {T} - w = let cached = maybepop!(lcrq.waitercache) + w = let cached = _maybepop!(lcrq.waitercache) if cached === nothing Waiter{eltype(lcrq)}() else @@ -600,7 +600,7 @@ function denqueue!(lcrq::DualLinkedConcurrentRingQueue{T}, x::Union{T,Waiter{T}} end function make_newcrq!(lcrq::DualLinkedConcurrentRingQueue, crq) - oldcrq = maybepop!(lcrq.crqcache) + oldcrq = _maybepop!(lcrq.crqcache) if oldcrq === nothing return similar(crq) else diff --git a/src/docs/ConcurrentQueue.md b/src/docs/ConcurrentQueue.md index bf15a37..3871f3e 100644 --- a/src/docs/ConcurrentQueue.md +++ b/src/docs/ConcurrentQueue.md @@ -2,7 +2,7 @@ Concurrent queue of objects of type `T`. -Use `push!` to insert an element at the tail and [`trypopfirst!`](@ref) to +Use `push!` to insert an element at the tail and [`maybepopfirst!`](@ref) to retrieve and remove an element at the head. Implementation detail: It implements the Michael and Scott queue. @@ -21,8 +21,8 @@ julia> push!(queue, 2); julia> popfirst!(queue) 1 -julia> trypopfirst!(queue) +julia> maybepopfirst!(queue) Some(2) -julia> trypopfirst!(queue) # returns nothing +julia> maybepopfirst!(queue) # returns nothing ``` diff --git a/src/docs/ConcurrentStack.md b/src/docs/ConcurrentStack.md index 1d8fd0a..63688ee 100644 --- a/src/docs/ConcurrentStack.md +++ b/src/docs/ConcurrentStack.md @@ -2,7 +2,7 @@ Concurrent stack of objects of type `T`. -Use `push!` to insert an element and [`trypop!`](@ref) to retrieve and remove an +Use `push!` to insert an element and [`maybepop!`](@ref) to retrieve and remove an element. It implements the Treiber stack. @@ -21,8 +21,8 @@ julia> push!(stack, 2); julia> pop!(stack) 2 -julia> trypop!(stack) +julia> maybepop!(stack) Some(1) -julia> trypop!(stack) # returns nothing +julia> maybepop!(stack) # returns nothing ``` diff --git a/src/docs/LinkedConcurrentRingQueue.md b/src/docs/LinkedConcurrentRingQueue.md index 2ab1b3f..2176215 100644 --- a/src/docs/LinkedConcurrentRingQueue.md +++ b/src/docs/LinkedConcurrentRingQueue.md @@ -1,6 +1,6 @@ LinkedConcurrentRingQueue{T}() -A concurrent queue with nonblocking `push!` and [`trypopfirst!`](@ref). +A concurrent queue with nonblocking `push!` and [`maybepopfirst!`](@ref). See also: [`DualLinkedConcurrentRingQueue`](@ref) @@ -14,13 +14,13 @@ julia> push!(q, 111); julia> push!(q, 222); -julia> trypopfirst!(q) # first-in first-out +julia> maybepopfirst!(q) # first-in first-out Some(111) -julia> trypopfirst!(q) +julia> maybepopfirst!(q) Some(222) -julia> trypopfirst!(q) === nothing # queue is empty +julia> maybepopfirst!(q) === nothing # queue is empty true ``` diff --git a/src/docs/WorkStealingDeque.md b/src/docs/WorkStealingDeque.md index b9022b1..ce275c0 100644 --- a/src/docs/WorkStealingDeque.md +++ b/src/docs/WorkStealingDeque.md @@ -4,9 +4,9 @@ Concurrent work-stealing "deque" of objects of type `T`. This is not a full deque in the sense that: -* `push!` and [`trypop!`](@ref) operating at the tail of the collection can +* `push!` and [`maybepop!`](@ref) operating at the tail of the collection can only be executed by a single task. -* [`trypopfirst!`](@ref) (aka steal) for retrieving and removing an element at +* [`maybepopfirst!`](@ref) (aka steal) for retrieving and removing an element at the head can be invoked from any tasks. However, there is no `pushfirst!`. Implementation detail: It implements the dynamic circular work-stealing deque by @@ -25,14 +25,14 @@ julia> push!(deque, 2); julia> push!(deque, 3); -julia> trypop!(deque) +julia> maybepop!(deque) Some(3) -julia> fetch(Threads.@spawn trypopfirst!(deque)) +julia> fetch(Threads.@spawn maybepopfirst!(deque)) Some(1) julia> fetch(Threads.@spawn popfirst!(deque)) 2 -julia> trypopfirst!(deque) # returns nothing +julia> maybepopfirst!(deque) # returns nothing ``` diff --git a/src/docs/maybeget.md b/src/docs/maybeget.md new file mode 100644 index 0000000..5d9e978 --- /dev/null +++ b/src/docs/maybeget.md @@ -0,0 +1,2 @@ + maybeget(dict::ConcurrentDict{K,V}, key) -> Some(value::T) or nothing + diff --git a/src/docs/trypop!.md b/src/docs/maybepop!.md similarity index 67% rename from src/docs/trypop!.md rename to src/docs/maybepop!.md index ad7d3c8..547dbf9 100644 --- a/src/docs/trypop!.md +++ b/src/docs/maybepop!.md @@ -1,4 +1,4 @@ - trypop!(collection) -> Some(value::T) or nothing + maybepop!(collection) -> Some(value::T) or nothing Try to pop a `value` from the tail of `collection`. Return `Some(value)` if it is non-empty. Return `nothing` if empty. @@ -12,8 +12,8 @@ julia> stack = ConcurrentStack{Int}(); julia> push!(stack, 1); -julia> trypop!(stack) +julia> maybepop!(stack) Some(1) -julia> trypop!(stack) # returns nothing +julia> maybepop!(stack) # returns nothing ``` diff --git a/src/docs/trypopfirst!.md b/src/docs/maybepopfirst!.md similarity index 65% rename from src/docs/trypopfirst!.md rename to src/docs/maybepopfirst!.md index 78f237d..500aaba 100644 --- a/src/docs/trypopfirst!.md +++ b/src/docs/maybepopfirst!.md @@ -1,4 +1,4 @@ - trypopfirst!(collection) -> Some(value::T) or nothing + maybepopfirst!(collection) -> Some(value::T) or nothing Try to pop a `value` from the head of `collection`. Return `Some(value)` if it @@ -13,8 +13,8 @@ julia> queue = ConcurrentQueue{Int}(); julia> push!(queue, 1); -julia> trypopfirst!(queue) +julia> maybepopfirst!(queue) Some(1) -julia> trypopfirst!(queue) # returns nothing +julia> maybepopfirst!(queue) # returns nothing ``` diff --git a/src/docs/tryget.md b/src/docs/tryget.md deleted file mode 100644 index c2cb890..0000000 --- a/src/docs/tryget.md +++ /dev/null @@ -1,2 +0,0 @@ - tryget(dict::ConcurrentDict{K,V}, key) -> Some(value::T) or nothing - diff --git a/src/lcrq.jl b/src/lcrq.jl index 574fae5..c9ba49c 100644 --- a/src/lcrq.jl +++ b/src/lcrq.jl @@ -144,7 +144,7 @@ function trypush!(crq::IndirectConcurrentRingQueueNode, x) end end -function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode) +function ConcurrentCollections.maybepopfirst!(crq::IndirectConcurrentRingQueueNode) while true h = (@atomic crq.head += true) - true itemindex = mod1(h, crq.length) # TODO: shift @@ -261,15 +261,15 @@ function Base.push!(lcrq::LinkedConcurrentRingQueue{T}, x) where {T} end end -function ConcurrentCollections.trypopfirst!(lcrq::LinkedConcurrentRingQueue) +function ConcurrentCollections.maybepopfirst!(lcrq::LinkedConcurrentRingQueue) while true crq = @atomic lcrq.head - x = trypopfirst!(crq) + x = maybepopfirst!(crq) x === nothing || return x next = @atomic crq.next next === nothing && return nothing - x = trypopfirst!(crq) + x = maybepopfirst!(crq) x === nothing || return x @atomicreplace lcrq.head crq => next end diff --git a/src/misc.jl b/src/misc.jl index 43e7c46..377d324 100644 --- a/src/misc.jl +++ b/src/misc.jl @@ -1,4 +1,4 @@ -function ConcurrentCollections.trypopfirst!(ch::Channel) +function ConcurrentCollections.maybepopfirst!(ch::Channel) y = iterate(ch) y === nothing && return nothing return Some(first(y)) diff --git a/src/msqueue.jl b/src/msqueue.jl index 3cbd6a1..7285cfa 100644 --- a/src/msqueue.jl +++ b/src/msqueue.jl @@ -43,7 +43,7 @@ function Base.push!(queue::ConcurrentQueue{T}, v) where {T} end end -function ConcurrentCollections.trypopfirst!(queue::ConcurrentQueue) +function ConcurrentCollections.maybepopfirst!(queue::ConcurrentQueue) head = @atomic queue.head tail = @atomic queue.tail while true @@ -71,7 +71,7 @@ function ConcurrentCollections.trypopfirst!(queue::ConcurrentQueue) end function Base.popfirst!(queue::ConcurrentQueue) - r = trypopfirst!(queue) + r = maybepopfirst!(queue) if r === nothing error("queue is empty") else diff --git a/src/stack.jl b/src/stack.jl index 4af2152..ab1853a 100644 --- a/src/stack.jl +++ b/src/stack.jl @@ -25,7 +25,7 @@ function Base.push!(stack::ConcurrentStack{T}, v) where {T} return stack end -function ConcurrentCollections.trypop!(stack::ConcurrentStack) +function ConcurrentCollections.maybepop!(stack::ConcurrentStack) while true node = @atomic stack.next node === nothing && return nothing @@ -39,7 +39,7 @@ function ConcurrentCollections.trypop!(stack::ConcurrentStack) end function Base.pop!(stack::ConcurrentStack) - r = trypop!(stack) + r = maybepop!(stack) if r === nothing error("stack is empty") else diff --git a/src/workstealing.jl b/src/workstealing.jl index ee83078..9f15f76 100644 --- a/src/workstealing.jl +++ b/src/workstealing.jl @@ -117,7 +117,7 @@ function Base.push!(deque::WorkStealingDeque, v) return deque end -function ConcurrentCollections.trypop!(deque::WorkStealingDeque) +function ConcurrentCollections.maybepop!(deque::WorkStealingDeque) bottom = @atomic deque.bottom buffer = @atomic deque.buffer bottom -= 1 @@ -142,7 +142,7 @@ function ConcurrentCollections.trypop!(deque::WorkStealingDeque) return r end -function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque{T}) where {T} +function ConcurrentCollections.maybepopfirst!(deque::WorkStealingDeque{T}) where {T} top = @atomic deque.top bottom = @atomic deque.bottom buffer = @atomic deque.buffer @@ -187,7 +187,7 @@ end # Atomics](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0690r1.html) function Base.pop!(deque::WorkStealingDeque) - r = trypop!(deque) + r = maybepop!(deque) if r === nothing error("deque is empty") else @@ -196,7 +196,7 @@ function Base.pop!(deque::WorkStealingDeque) end function Base.popfirst!(deque::WorkStealingDeque) - r = trypopfirst!(deque) + r = maybepopfirst!(deque) if r === nothing error("deque is empty") else diff --git a/test/ConcurrentCollectionsTests/src/test_crq.jl b/test/ConcurrentCollectionsTests/src/test_crq.jl index 41767f7..8eb52ae 100644 --- a/test/ConcurrentCollectionsTests/src/test_crq.jl +++ b/test/ConcurrentCollectionsTests/src/test_crq.jl @@ -83,15 +83,15 @@ function concurrent_denqueue!( local ys_nb = Int[] local ys_b = Int[] while true - local y = trypopfirst!(crq) + local y = maybepopfirst!(crq) if y === nothing if activesenders[] == 0 - y = trypopfirst!(crq) + y = maybepopfirst!(crq) if y === nothing # Confirm that CRQ is empty return y end # Reaching here means that there were some enqueues - # between our `trypopfirst!(crq) === nothing` and + # between our `maybepopfirst!(crq) === nothing` and # `activesenders[] == 0`. else poll() diff --git a/test/ConcurrentCollectionsTests/src/test_dict.jl b/test/ConcurrentCollectionsTests/src/test_dict.jl index 0e165f4..6efc695 100644 --- a/test/ConcurrentCollectionsTests/src/test_dict.jl +++ b/test/ConcurrentCollectionsTests/src/test_dict.jl @@ -70,11 +70,11 @@ function test_dict(npairs) @test vs == reverse((1:npairs) .* -1) end end - @testset "trypop!" begin - @test tryget(d, 1) === Some(-1) - @test trypop!(d, 1) === Some(-1) - @test tryget(d, 1) === nothing - @test trypop!(d, 1) === nothing + @testset "maybepop!" begin + @test maybeget(d, 1) === Some(-1) + @test maybepop!(d, 1) === Some(-1) + @test maybeget(d, 1) === nothing + @test maybepop!(d, 1) === nothing @test 1 ∉ sort!(collect(keys(d))) end @testset "length_upper_bound" begin @@ -116,11 +116,11 @@ function test_dict(npairs) @test kvs == [str(i) => -i for i in 1:npairs] end end - @testset "trypop!" begin - @test tryget(d, "001") === Some(-1) - @test trypop!(d, "001") === Some(-1) - @test tryget(d, "001") === nothing - @test trypop!(d, "001") === nothing + @testset "maybepop!" begin + @test maybeget(d, "001") === Some(-1) + @test maybepop!(d, "001") === Some(-1) + @test maybeget(d, "001") === nothing + @test maybepop!(d, "001") === nothing @test "001" ∉ sort!(collect(keys(d))) end @testset "clusters" begin @@ -173,7 +173,7 @@ function random_mutation!(dict; nkeys = 8, repeat = 2^20, ntasks = Threads.nthre for _ in 1:repeat k = rand(ks) if rand(Bool) - y = trypop!(dict, k) + y = maybepop!(dict, k) if y !== nothing popped[k] += something(y) end @@ -236,7 +236,7 @@ function phased_push_pop!( spin = 10_000 # spin for a few μs cycle!(barrier[itask], spin) for k in ks - popped[k] += something(trypop!(dict, k), 0) + popped[k] += something(maybepop!(dict, k), 0) end end end diff --git a/test/ConcurrentCollectionsTests/src/test_lcrq.jl b/test/ConcurrentCollectionsTests/src/test_lcrq.jl index b34f478..b34ff5c 100644 --- a/test/ConcurrentCollectionsTests/src/test_lcrq.jl +++ b/test/ConcurrentCollectionsTests/src/test_lcrq.jl @@ -8,13 +8,13 @@ using Test function test_push_pop_once_int() q = LinkedConcurrentRingQueue{Int}() push!(q, 111) - @test trypopfirst!(q) == Some(111) + @test maybepopfirst!(q) == Some(111) end function test_push_pop_once_any() q = LinkedConcurrentRingQueue() push!(q, 111) - @test trypopfirst!(q) == Some{Any}(111) + @test maybepopfirst!(q) == Some{Any}(111) end function test_push_pop_100() @@ -22,7 +22,7 @@ function test_push_pop_100() q = LinkedConcurrentRingQueue{Int}() foldl(push!, 1:n; init = q) ys = Int[] - while (y = trypopfirst!(q)) !== nothing + while (y = maybepopfirst!(q)) !== nothing push!(ys, something(y)) end @test ys == 1:n @@ -49,7 +49,7 @@ function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer push!(received, ys) Threads.@spawn begin while true - y = trypopfirst!(q) + y = maybepopfirst!(q) if y === nothing yield() else diff --git a/test/ConcurrentCollectionsTests/src/test_msqueue.jl b/test/ConcurrentCollectionsTests/src/test_msqueue.jl index 53b07ce..6a98d24 100644 --- a/test/ConcurrentCollectionsTests/src/test_msqueue.jl +++ b/test/ConcurrentCollectionsTests/src/test_msqueue.jl @@ -8,7 +8,7 @@ function test_simple() xs = 1:10 foldl(push!, xs; init = q) @test [popfirst!(q) for _ in xs] == xs - @test trypopfirst!(q) === nothing + @test maybepopfirst!(q) === nothing end function pushpop(xs, ntasks = Threads.nthreads()) @@ -21,7 +21,7 @@ function pushpop(xs, ntasks = Threads.nthreads()) Threads.@spawn begin local ys = eltype(xs)[] while true - r = trypopfirst!(queue) + r = maybepopfirst!(queue) if r === nothing done[] && break continue diff --git a/test/ConcurrentCollectionsTests/src/test_tsstack.jl b/test/ConcurrentCollectionsTests/src/test_tsstack.jl index b8d6bf6..a4d1619 100644 --- a/test/ConcurrentCollectionsTests/src/test_tsstack.jl +++ b/test/ConcurrentCollectionsTests/src/test_tsstack.jl @@ -8,7 +8,7 @@ function test_simple() xs = 1:10 foldl(push!, xs; init = stack) @test [pop!(stack) for _ in xs] == reverse(xs) - @test trypop!(stack) === nothing + @test maybepop!(stack) === nothing end function pushpop(xs, ntasks = Threads.nthreads()) @@ -21,7 +21,7 @@ function pushpop(xs, ntasks = Threads.nthreads()) Threads.@spawn begin local ys = eltype(xs)[] while true - r = trypop!(stack) + r = maybepop!(stack) if r === nothing done[] && break continue diff --git a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl index c9d7097..ca6afb5 100644 --- a/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl +++ b/test/ConcurrentCollectionsTests/src/test_work_stealing_deque.jl @@ -8,12 +8,12 @@ function test_single_thread_push_pop() xs = 1:50 foldl(push!, xs; init = deque) @test [pop!(deque) for _ in xs] == reverse(xs) - @test trypop!(deque) === nothing - @test trypopfirst!(deque) === nothing + @test maybepop!(deque) === nothing + @test maybepopfirst!(deque) === nothing push!(deque, 1) - @test trypopfirst!(deque) === Some(1) - @test trypopfirst!(deque) === nothing - @test trypop!(deque) === nothing + @test maybepopfirst!(deque) === Some(1) + @test maybepopfirst!(deque) === nothing + @test maybepop!(deque) === nothing foldl(push!, xs; init = deque) n = length(deque.buffer) @@ -34,7 +34,7 @@ function random_pushpop(xs; ntasks = Threads.nthreads() - 1, sentinel = -1) Threads.@spawn begin local ys = eltype(xs)[] while true - local r = trypopfirst!(deque) + local r = maybepopfirst!(deque) if r === nothing GC.safepoint() continue @@ -52,7 +52,7 @@ function random_pushpop(xs; ntasks = Threads.nthreads() - 1, sentinel = -1) push!(deque, x) # continue if mod(i, 8) == 0 - r = trypop!(deque) + r = maybepop!(deque) GC.safepoint() r === nothing && continue push!(zs, something(r))