diff --git a/benchmark/ConcurrentCollectionsBenchmarks/Project.toml b/benchmark/ConcurrentCollectionsBenchmarks/Project.toml index 9bc079c..b6c2d00 100644 --- a/benchmark/ConcurrentCollectionsBenchmarks/Project.toml +++ b/benchmark/ConcurrentCollectionsBenchmarks/Project.toml @@ -7,3 +7,4 @@ version = "0.1.0" BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66" BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd" +Formatting = "59287772-0a20-5a39-b81b-1366585eb4c0" diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/ConcurrentCollectionsBenchmarks.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/ConcurrentCollectionsBenchmarks.jl index cc589f8..9a675e0 100644 --- a/benchmark/ConcurrentCollectionsBenchmarks/src/ConcurrentCollectionsBenchmarks.jl +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/ConcurrentCollectionsBenchmarks.jl @@ -2,15 +2,20 @@ module ConcurrentCollectionsBenchmarks using BenchmarkTools: Benchmark, BenchmarkGroup +include("utils.jl") include("bench_dict_histogram.jl") include("bench_dict_get_existing.jl") include("bench_dict_migration.jl") +include("bench_queue_pushpop.jl") +include("bench_queue_hot_potato.jl") function setup() suite = BenchmarkGroup() suite["DictHistogram"] = BenchDictHistogram.setup() suite["DictGetExisting"] = BenchDictGetExisting.setup() suite["DictMigration"] = BenchDictMigration.setup() + suite["QueuePushPop"] = BenchQueuePushPop.setup() + suite["HotPotato"] = BenchQueueHotPotato.setup() return suite end @@ -40,6 +45,8 @@ function clear() BenchDictHistogram.clear() BenchDictGetExisting.clear() BenchDictMigration.clear() + BenchQueuePushPop.clear() + BenchQueueHotPotato.clear() end end # module diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl new file mode 100644 index 0000000..cc505bd --- /dev/null +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_hot_potato.jl @@ -0,0 +1,204 @@ +module BenchQueueHotPotato + +using BenchmarkTools +using ConcurrentCollections +using ConcurrentCollections.Implementations: atomic_modifyfield! +using Formatting: format +using ..Utils: maptasks + +const HOTPOTATO = false +const NOTPOTATO = true + +const Stat = typeof((npush = 0, npop = 0, t1 = time_ns(), t2 = time_ns())) + +Base.@kwdef struct HotPotatoResult + duration::Union{Float64,Nothing} + nrepeat::Int + stats::Vector{Stat} +end + +function unfair_sleep(seconds::Real) + t0 = time_ns() + ns = seconds * 1e9 + while time_ns() - t0 < ns + GC.safepoint() + # yield() + end +end + +@noinline function hotpotato!( + q; + ntasks::Integer = Threads.nthreads(), + duration::Union{Real,Nothing} = nothing, + nrepeat::Integer = duration === nothing ? 2^15 : typemax(Int64), +) + ntasks < 1 && error("require positive `ntasks`: got $ntasks") + local tasks + push!(q, HOTPOTATO) + t0 = time_ns() + tasks = maptasks(1:ntasks) do itask + # Core.print("$itask-th task started\n") + local t1 = time_ns() + local npush = 0 + local npop = 0 + for irepeat in 1:nrepeat + # ccall(:jl_breakpoint, Cvoid, (Any,), (; irepeat, itask)) + if rand(Bool) + push!(q, NOTPOTATO) + npush += 1 + else + y = popfirst!(q) + npop += 1 + if y == HOTPOTATO + # Since there are GC pauses, maybe there's no need + # to add sleep here. + # unfair_sleep(0.001) + # sleep(0.001) + push!(q, y) + npush += 1 + end + end + if duration !== nothing + if (time_ns() - t0) / 1e9 > duration + break + end + end + end + return (; npush, npop, t1, t2 = time_ns())::Stat + end + return HotPotatoResult(; nrepeat, duration, stats = map(fetch, tasks)) +end + +mutable struct AtomicRef{T} + @atomic x::T +end + +const FAIStat = typeof((; nfai = 0, t1 = time_ns(), t2 = time_ns())) + +Base.@kwdef struct FAIResult + duration::Union{Float64,Nothing} + nrepeat::Int + refvalue::Int + stats::Vector{FAIStat} +end + +function fai_stats(; + ntasks::Integer = Threads.nthreads(), + duration::Union{Real,Nothing} = nothing, + nrepeat::Integer = duration === nothing ? 2^15 : typemax(Int64), + impl::Val = Val(:threads), +) + ntasks < 1 && error("require positive `ntasks`: got $ntasks") + local tasks + ref = if impl === Val(:threads) + Threads.Atomic{Int32}(0) + else + AtomicRef{Int32}(0) + end + t0 = time_ns() + tasks = maptasks(1:ntasks) do itask + local t1 = time_ns() + local nfai = 0 + for irepeat in 1:nrepeat + if impl === Val(:threads) + Threads.atomic_add!(ref, Int32(1)) + elseif impl === Val(:unsafe) + atomic_modifyfield!(ref, Val(:x), +, Int32(1)) + else + @atomic ref.x += true + end + nfai += 1 + if duration !== nothing + if (time_ns() - t0) / 1e9 > duration + break + end + end + end + return (; nfai, t1, t2 = time_ns())::FAIStat + end + if impl === Val(:threads) + refvalue = ref[] + else + refvalue = @atomic ref.x + end + return FAIResult(; nrepeat, duration, refvalue, stats = map(fetch, tasks)) +end + +function setup(; kwargs...) + suite = BenchmarkGroup() + suite["channel"] = @benchmarkable hotpotato!(Channel{Bool}(Inf); $kwargs...) + suite["dlcrq"] = + @benchmarkable hotpotato!(DualLinkedConcurrentRingQueue{Bool}(); $kwargs...) + return suite +end + +function clear() end + +function summarize(result::HotPotatoResult) + npush = sum(stat.npush for stat in result.stats) + npop = sum(stat.npop for stat in result.stats) + t1min = minimum(stat.t1 for stat in result.stats) + t2max = maximum(stat.t2 for stat in result.stats) + duration = (t2max - t1min) / 1e9 + throughput = (npush + npop) / duration + return (; npush, npop, duration, throughput) +end + +function Base.show(io::IO, ::MIME"text/plain", result::HotPotatoResult) + (; npush, npop, duration, throughput) = summarize(result) + println(io, "Hot potato benchmark with ", length(result.stats), " tasks") + println( + io, + format(npush; commas = true), + " pushes and ", + format(npop; commas = true), + " pops in ", + duration, + " seconds", + ) + print( + io, + "throughput = ", + format(floor(Int, throughput); commas = true), + " ops/seconds", + ) +end + +function summarize(result::FAIResult) + nfai = sum(stat.nfai for stat in result.stats) + t1min = minimum(stat.t1 for stat in result.stats) + t2max = maximum(stat.t2 for stat in result.stats) + duration = (t2max - t1min) / 1e9 + throughput = nfai / duration + return (; nfai, duration, throughput) +end + +function Base.show(io::IO, ::MIME"text/plain", result::FAIResult) + (; nfai, duration, throughput) = summarize(result) + println(io, "FAI benchmark with ", length(result.stats), " tasks") + println(io, format(nfai; commas = true), " FAIs in ", duration, " seconds") + print( + io, + "throughput = ", + format(floor(Int, throughput); commas = true), + " ops/seconds", + ) +end + +function timings(result::HotPotatoResult) + t1min = minimum(stat.t1 for stat in result.stats) + table = + Iterators.map(enumerate(result.stats)) do (taskid, stat) + (; t1, t2) = stat + return ( + (; t = (t1 - t1min) / 1e9, event = :begin, taskid), + (; t = (t2 - t1min) / 1e9, event = :end, taskid), + ) + end |> + Iterators.flatten |> + collect + sort!(table; by = row -> row.t) + return table +end + +end # module diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl new file mode 100644 index 0000000..1465c1b --- /dev/null +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_queue_pushpop.jl @@ -0,0 +1,57 @@ +module BenchQueuePushPop + +using BenchmarkTools +using ConcurrentCollections + +function pushpop!( + q; + nrepeat::Integer = 2^15, + nitems::Integer = 11, + ntasks::Integer = Threads.nthreads(), + nspins::Integer = 100, +) + ntasks < 1 && error("require positive `ntasks`: got $ntasks") + local tasks + @sync begin + tasks = map(1:ntasks) do _ + Threads.@spawn begin + nyields = 0 + for _ in 1:nrepeat + for i in 1:nitems + push!(q, i) + end + for _ in 1:nitems + while true + y = nothing + for _ in 1:nspins + y = trypopfirst!(q) + y === nothing || break + end + y === nothing || break + yield() + nyields += 1 + if nyields > 2^20 + @error "Too many yields!" + error("Too many yields!") + end + end + end + end + return nyields + end + end + end + return (; nrepeat, nitems, nyields = map(fetch, tasks)) +end + +function setup(; kwargs...) + suite = BenchmarkGroup() + suite["channel"] = @benchmarkable pushpop!(Channel{Int}(Inf); $kwargs...) + suite["msqueue"] = @benchmarkable pushpop!(ConcurrentQueue{Int}(); $kwargs...) + suite["lcrq"] = @benchmarkable pushpop!(LinkedConcurrentRingQueue{Int}(); $kwargs...) + return suite +end + +function clear() end + +end # module diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/utils.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/utils.jl new file mode 100644 index 0000000..b7aea6a --- /dev/null +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/utils.jl @@ -0,0 +1,15 @@ +module Utils + +function maptasks(f, xs) + tasks = Task[] + for (tid, x) in enumerate(xs) + t = @task f(x) + t.sticky = false + ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, mod1(tid, Threads.nthreads()) - 1) + schedule(t) + push!(tasks, t) + end + return tasks +end + +end # module diff --git a/benchmark/hotpotato/.gitignore b/benchmark/hotpotato/.gitignore new file mode 100644 index 0000000..796b96d --- /dev/null +++ b/benchmark/hotpotato/.gitignore @@ -0,0 +1 @@ +/build diff --git a/benchmark/hotpotato/plot.jl b/benchmark/hotpotato/plot.jl new file mode 100644 index 0000000..3f56c43 --- /dev/null +++ b/benchmark/hotpotato/plot.jl @@ -0,0 +1,114 @@ +using DataFrames +using FileIO +using JSON +using Statistics +using VegaLite + +rawdata = JSON.parsefile(joinpath(@__DIR__, "build/results.json")) + +potatos = map(rawdata["potatos"]) do info + result = info["result"] + ntasks = info["ntasks"] + impl = Symbol(info["impl"]) + trialid = info["trialid"] + stats = result["stats"] + npush = sum(stat["npush"] for stat in stats) + npop = sum(stat["npop"] for stat in stats) + t1min = minimum(stat["t1"] for stat in stats) + t2max = maximum(stat["t2"] for stat in stats) + duration = (t2max - t1min) / 1e9 + nops = npush + npop + throughput = nops / duration + return (; trialid, ntasks, impl, throughput, duration, nops, npush, npop) +end + +fais = map(rawdata["fais"]) do info + result = info["result"] + ntasks = info["ntasks"] + impl = :fai + trialid = info["trialid"] + stats = result["stats"] + nops = sum(stat["nfai"] for stat in stats) + t1min = minimum(stat["t1"] for stat in stats) + t2max = maximum(stat["t2"] for stat in stats) + duration = (t2max - t1min) / 1e9 + throughput = nops / duration + return (; trialid, ntasks, impl, throughput, duration, nops) +end + +datadf = vcat(DataFrame(potatos), DataFrame(fais); cols = :union) + +impllabels = Dict( + # :impl -> :Implementation (for plot) + :base => "Base (Channel)", + :dlcrq => "Dual LCRQ", + :fai => "Hardware \"limit\" (FAI)", +) + +datadf[!, :Implementation] = getindex.(Ref(impllabels), datadf.impl) + +datadf[!, :diff_push_pop] = datadf.npush .- datadf.npop + +plt = @vlplot( + :point, + x = {:ntasks, title = "Number of Tasks"}, + y = {:throughput, title = "Throughput [#OP/second]"}, + color = :Implementation, + title = "Hot Potato Benchmark", + data = datadf, +) + +save(joinpath(@__DIR__, "build/results.png"), plt) + +summarydf = let + idx = datadf.impl .!= :fai + df = datadf[idx, :] + + sdf = unstack( + combine(groupby(df, [:impl, :ntasks]), :throughput => median => :throughput), + :impl, + :throughput, + ) + sdf[!, :speedup] = sdf.dlcrq ./ sdf.base + sdf +end + +plt_speedup = @vlplot( + :point, + x = {:ntasks, title = "Number of Tasks"}, + y = {:speedup, title = "Speedup [DLCRQ/Base]"}, + title = "Hot Potato Benchmark (Speedup)", + data = summarydf, +) + +save(joinpath(@__DIR__, "build/speedup.png"), plt_speedup) + +plt_trialid = @vlplot( + :point, + x = {:ntasks, title = "Number of Tasks"}, + y = {:throughput, title = "Throughput [#OP/second]"}, + color = { + :trialid, + scale = {scheme = :magma}, + }, + title = "Hot Potato Benchmark", + data = datadf[datadf.impl .== :base, :], +) + +save(joinpath(@__DIR__, "build/trial_dependencies.png"), plt_trialid) + +plt_diff_push_pop = @vlplot( + :point, + x = :diff_push_pop, + y = :throughput, + color = { + :ntasks, + # "ntasks:o", + scale = {scheme = :viridis}, + }, + shape = :impl, + column = :impl, + data = datadf, +) + +save(joinpath(@__DIR__, "build/diff_push_pop.png"), plt_diff_push_pop) diff --git a/benchmark/hotpotato/run.jl b/benchmark/hotpotato/run.jl new file mode 100644 index 0000000..c479267 --- /dev/null +++ b/benchmark/hotpotato/run.jl @@ -0,0 +1,54 @@ +using ConcurrentCollections: DualLinkedConcurrentRingQueue +using ConcurrentCollectionsBenchmarks.BenchQueueHotPotato: hotpotato!, fai_stats +using JSON + +function sweep(; repeat = 10, duration = 1, maxntasks = Threads.nthreads()) + # cooldown() = sleep(0.1) + # cooldown() = GC.gc() + cooldown() = nothing + potatos = [] + fais = [] + for trialid in 1:repeat + for ntasks in 1:maxntasks + println(stderr, "Trial $trialid/$repeat #Tasks $ntasks/$maxntasks") + + cooldown() + result = hotpotato!(DualLinkedConcurrentRingQueue{Bool}(); ntasks, duration) + push!(potatos, (; trialid, result, ntasks, impl = :dlcrq)) + cooldown() + result = hotpotato!(Channel{Bool}(Inf); ntasks, duration) + push!(potatos, (; trialid, result, ntasks, impl = :base)) + + cooldown() + result = fai_stats(; ntasks, duration) + push!(fais, (; trialid, result, ntasks)) + end + end + return (; potatos, fais, repeat, duration) +end + +function git_info(dir = @__DIR__) + git(cmd) = strip(read(setenv(`git $cmd`; dir), String)) + return (; + revision = git(`rev-parse HEAD`), + status = git(`status --short --untracked-files=no --porcelain`), + ) +end + +function main(args = ARGS) + output = get(args, 1, joinpath(@__DIR__, "build", "results.json")) + mkpath(dirname(output)) + git = git_info() + @info "Warmup..." + sweep(; repeat = 1, duration = 0.1, maxntasks = 1) + @info "Benchmarking..." + results = sweep() + results = (; results..., git) + open(output, write = true) do io + JSON.print(io, results) + end +end + +if abspath(PROGRAM_FILE) == @__FILE__ + main() +end diff --git a/licenses/README.md b/licenses/README.md new file mode 100644 index 0000000..9f14fba --- /dev/null +++ b/licenses/README.md @@ -0,0 +1,10 @@ +# Licenses associated with the original implementations + +* [lcrq.txt](lcrq.txt) (BSD-3-Clause): + List of Concurrent Ring Queues. + . + +* [duals.txt](duals.txt) (Apache-2.0): + Dual data structures associated with "Generality and Speed in Nonblocking Dual + Containers" by Izraelevitz and Scott (TOPC). + . diff --git a/licenses/duals.txt b/licenses/duals.txt new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/licenses/duals.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/licenses/lcrq.txt b/licenses/lcrq.txt new file mode 100644 index 0000000..29f5bff --- /dev/null +++ b/licenses/lcrq.txt @@ -0,0 +1,28 @@ +Copyright (c) 2013, Adam Morrison and Yehuda Afek. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + * Neither the name of the Tel Aviv University nor the names of the + author of this software may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/ConcurrentCollections.jl b/src/ConcurrentCollections.jl index 77a3b4c..9611c38 100644 --- a/src/ConcurrentCollections.jl +++ b/src/ConcurrentCollections.jl @@ -6,7 +6,9 @@ export ConcurrentQueue, ConcurrentStack, Delete, + DualLinkedConcurrentRingQueue, Keep, + LinkedConcurrentRingQueue, WorkStealingDeque, length_upper_bound, length_upper_bound, @@ -59,15 +61,24 @@ include("UnsafeAtomics.jl") using .UnsafeAtomics: acq_rel, acquire, monotonic, release, seq_cst, unordered include("utils.jl") +include("cache.jl") include("atomicsutils.jl") include("dict.jl") include("workstealing.jl") include("msqueue.jl") include("stack.jl") +include("lcrq.jl") +include("dlcrq.jl") +include("misc.jl") end # module Implementations -using .Implementations: ConcurrentQueue, ConcurrentStack, WorkStealingDeque +using .Implementations: + ConcurrentQueue, + ConcurrentStack, + DualLinkedConcurrentRingQueue, + LinkedConcurrentRingQueue, + WorkStealingDeque Implementations.define_docstrings() diff --git a/src/UnsafeAtomics.jl b/src/UnsafeAtomics.jl index b812a3f..656ff87 100644 --- a/src/UnsafeAtomics.jl +++ b/src/UnsafeAtomics.jl @@ -13,11 +13,35 @@ const seq_cst = Val{:seq_cst}() const orderings = [:unordered, :monotonic, :acquire, :release, :acq_rel, :seq_cst] +@inline load(x) = load(x, seq_cst) +@inline store!(x, v) = store!(x, v, seq_cst) +@inline cas!(x, cmp, new) = cas!(x, cmp, new, seq_cst, seq_cst) +@inline modify!(ptr, op, x) = modify!(ptr, op, x, seq_cst) + +right(_, x) = x + +const OP_RMW_TABLE = [ + (+) => :add, + (-) => :sub, + right => :xchg, + (&) => :and, + (⊼) => :nand, + (|) => :or, + (⊻) => xor, + max => :max, + min => :min, +] + +for (op, rmwop) in OP_RMW_TABLE + fn = Symbol(rmwop, "!") + @eval @inline $fn(x, v) = $fn(x, v, seq_cst) + @eval @inline modify!(ptr, ::typeof($op), x, ord::Val) = $fn(ptr, x, ord) +end + for typ in inttypes lt = llvmtypes[typ] rt = "$lt, $lt*" - @eval @inline load(x::Ptr{$typ}) = load(x, seq_cst) for ord in orderings ord in [:release, :acq_rel] && continue @@ -35,7 +59,6 @@ for typ in inttypes end end - @eval @inline store!(x::Ptr{$typ}, v::$typ) = store!(x, v, seq_cst) for ord in orderings ord in [:acquire, :acq_rel] && continue @@ -54,8 +77,6 @@ for typ in inttypes end end - @eval @inline cas!(x::Ptr{$typ}, cmp::$typ, new::$typ) = - cas!(x, cmp, new, seq_cst, seq_cst) for success_ordering in orderings[2:end], failure_ordering in [:monotonic, :acquire, :seq_cst] @@ -91,7 +112,6 @@ for typ in inttypes # LLVM distinguishes signedness in the operation, not the integer type. rmw = "u" * rmw end - @eval @inline $fn(x::Ptr{$typ}, v::$typ) = $fn(x, v, seq_cst) for ord in orderings @eval function $fn(x::Ptr{$typ}, v::$typ, ::$(Val{ord})) return llvmcall( diff --git a/src/atomicsutils.jl b/src/atomicsutils.jl index 11feea9..a8d7820 100644 --- a/src/atomicsutils.jl +++ b/src/atomicsutils.jl @@ -61,3 +61,15 @@ end end return found == cmpint end + +@inline atomic_modifyfield!(obj, field::Val, op, x) = + atomic_modifyfield!(obj, field, op, x, seq_cst) +@inline function atomic_modifyfield!(obj, field::Val, op::OP, x, order) where {OP} + FieldType = fieldtype(typeof(obj), fieldindex(obj, field)) + fptr = Ptr{FieldType}(fieldpointer(obj, field)) + v = convert(FieldType, x) + GC.@preserve obj begin + old = UnsafeAtomics.modify!(fptr, op, v, order) + end + return old +end diff --git a/src/cache.jl b/src/cache.jl new file mode 100644 index 0000000..2be9fe6 --- /dev/null +++ b/src/cache.jl @@ -0,0 +1,28 @@ +struct ThreadLocalCache{T} + cache::Vector{Vector{T}} + size::Int +end + +function ThreadLocalCache{T}(; size::Integer = 4) where {T} + cache = [empty!(Vector{T}(undef, size)) for _ in 1:Threads.nthreads()] + return ThreadLocalCache(cache, size) +end + +function maybepop!(cache::ThreadLocalCache) + buffer = cache.cache[Threads.threadid()] + if isempty(buffer) + return nothing + else + return pop!(buffer) + end +end + +function trypush!(cache::ThreadLocalCache{T}, x::T) where {T} + buffer = cache.cache[Threads.threadid()] + if length(buffer) ≥ cache.size + return false + else + push!(buffer, x) + return true + end +end diff --git a/src/dlcrq.jl b/src/dlcrq.jl new file mode 100644 index 0000000..196a0e2 --- /dev/null +++ b/src/dlcrq.jl @@ -0,0 +1,671 @@ +@enum Polarity::Bool DATA ANTIDATA + +@enum WaiterState begin + WAITER_INIT + WAITER_WAITING + WAITER_NOTIFYING + WAITER_SATISFIED + WAITER_FINISHED + WAITER_TRASHED +end + +mutable struct Waiter{T} + # _tag_pad::PadAfter32 + task::Union{Task,Nothing} + value::Union{Nothing,T} + @atomic state::WaiterState + # _state_pad::PadAfter64 # TODO: smaller pad +end + +Waiter{T}(task::Task = current_task()) where {T} = Waiter{T}( + # PadAfter32(), + task, + nothing, + WAITER_INIT, + # PadAfter64(), +) + +function Base.fetch(w::Waiter{T}; nspins::Integer = 0) where {T} + for _ in 1:nspins + if (@atomic w.state) == WAITER_SATISFIED + @goto satisifed + end + GC.safepoint() + end + + _, success = @atomicreplace w.state WAITER_INIT => WAITER_WAITING + success && wait() + + @label satisifed + let old = @atomic w.state + @assert old in (WAITER_SATISFIED, WAITER_NOTIFYING) + old, success = @atomicreplace w.state old => WAITER_FINISHED + if !success + error("more than two duals detected: old = $old") + end + end + + x = w.value + # Help GC: + w.task = nothing + w.value = nothing + return x::T +end + +function Base.put!(w::Waiter{T}, x::T) where {T} + # Before the CAS, `put!` owns the field `.value`: + w.value = x + + old = WAITER_WAITING # initial guess + while true + if old == WAITER_WAITING + new = WAITER_NOTIFYING + else + @assert old == WAITER_INIT "required old == WAITER_INIT but old = $old" + new = WAITER_SATISFIED + end + old, success = @atomicreplace w.state old => new + if success + if old == WAITER_WAITING + schedule(w.task::Task) + end + return + end + end +end + +function trash!(w::Waiter) + if assertion_enabled() + old, success = @atomicreplace w.state WAITER_INIT => WAITER_TRASHED + if !success + error("unshared waiter mutaed: state: $old") + end + else + @atomic :monotonic w.state = WAITER_TRASHED + end + w.task = nothing +end + +struct MPCRQSlot{S} + index_safe_polarity::UInt32 + storage::S # sizeof(S) ≤ 4 +end + +const MPCRQSLOT_MAX_INDEX = Int32(typemax(UInt32) >> 2) + +@inline function MPCRQSlot(; index::Integer, safe::Bool, polarity::Polarity, storage) + # index ≤ typemax(UInt32) >> 2 || error("index $index too large") + index_safe_polarity = (index % UInt32) << 2 + index_safe_polarity |= (safe % UInt32) << 1 + index_safe_polarity |= UInt32(polarity) + return MPCRQSlot(index_safe_polarity, storage) +end + +@inline MPCRQSlot{T}(; + index::Integer, + safe::Bool, + polarity::Polarity, + storage::T = zero(T), +) where {T<:Number} = MPCRQSlot(; index, safe, polarity, storage) + +@inline function Base.getproperty(slot::MPCRQSlot, name::Symbol) + index_safe_polarity = getfield(slot, :index_safe_polarity) + if name === :index + return (index_safe_polarity >> 2) % Int32 + elseif name === :safe + return ((index_safe_polarity >> 1) & 0x01) % Bool + elseif name === :polarity + return Polarity((index_safe_polarity & 0x01) % Bool) + end + return getfield(slot, name) +end + +@enum MPCRQResult MPCRQ_CLOSED MPCRQ_ENQUEUED + +# See: IndirectConcurrentRingQueueNode +mutable struct IndirectMultiPolarityConcurrentRingQueueNode{ + T, + Data<:Union{Nothing,Vector{Vector{T}}}, +} + _tag_pad::PadAfter32 + @atomic data_idx::Int32 + _data_idx_pad::PadAfter32 + @atomic antidata_idx::Int32 + _antidata_idx_pad::PadAfter32 + @atomic closed_info::Int32 + _closed_info_pad::PadAfter32 + @atomic next::Union{Nothing,IndirectMultiPolarityConcurrentRingQueueNode{T}} + _next_pad::PadAfter64 + ring::typeof(cacheline_padded_vector(MPCRQSlot{UInt32}, 1)) + log2len::Int + # buffers::Vector{Vector{Some{T},Waiter{T}}} + data::Data + waiters::Vector{Vector{Waiter{T}}} +end + +inlinedata(::IndirectMultiPolarityConcurrentRingQueueNode{<:Any,Data}) where {Data} = + Data === Nothing + +# TODO: more types +const SmallSigned = Union{Int8,Int16} +const SmallUnsigned = Union{Bool,UInt8,UInt16} +const NullableInline32 = Union{SmallSigned,SmallUnsigned} + +function IndirectMultiPolarityConcurrentRingQueueNode{T}(log2len::Int) where {T} + if !(0 <= log2len <= ITEMINDEX_BITS32) + error("Expected: 0 <= log2len <= $ITEMINDEX_BITS32; Got log2len = $log2len") + end + len = 2^log2len + if T <: NullableInline32 && isconcretetype(T) + data = nothing + else + data = [Vector{T}(undef, len) for _ in 1:Threads.nthreads()] + end + waiters = [Vector{Waiter{T}}(undef, len) for _ in 1:Threads.nthreads()] + ring = init_ring!(cacheline_padded_vector(MPCRQSlot{UInt32}, len)) + data_idx = Int32(1) + antidata_idx = Int32(1) + closed_info = Int32(1) + return IndirectMultiPolarityConcurrentRingQueueNode( + PadAfter32(), + data_idx, + PadAfter32(), + antidata_idx, + PadAfter32(), + closed_info, + PadAfter32(), + nothing, + PadAfter64(), + ring, + log2len, + data, + waiters, + ) +end +# It looks like some non-neglegible amount of time is spent in +# `Vector{Waiter{T}}(undef, _)`, presumably zeroing out the elements. A tricky +# part of this is detecting that the CRQ is no longer used. Since the CRQ may be +# closed just after the FAI and before the CAS on the slot is complete, (naively +# thinking) linear scan the `.ring` array is required for checking that the CRQ +# is not longer used (without adding other mechanisms like ref conting). So, +# ATM, only the newcrq that lost CAS is cached (which still helps the +# performance). + +function init_ring!(ring) + for index in eachindex(ring) + ring[index] = MPCRQSlot{UInt32}(; index, safe = true, polarity = DATA) + end + return ring +end + +function Base.empty!(crq::IndirectMultiPolarityConcurrentRingQueueNode) + @atomic crq.data_idx = Int32(1) + @atomic crq.antidata_idx = Int32(1) + @atomic crq.closed_info = Int32(1) + @atomic crq.next = nothing + init_ring!(crq.ring) + # for waiters in crq.waiters + # n = length(waiters) + # resize!(empty!(waiters), n) + # end + return crq +end + +Base.similar(crq::IndirectMultiPolarityConcurrentRingQueueNode{T}) where {T} = + IndirectMultiPolarityConcurrentRingQueueNode{T}(crq.log2len) + +Base.eltype(::Type{<:IndirectMultiPolarityConcurrentRingQueueNode{T}}) where {T} = T + +#= +function Base.isempty(crq::IndirectMultiPolarityConcurrentRingQueueNode) + closed_info = @atomic crq.closed_info + closed_info_idx = extract_index(closed_info) + isclosed = closed_info < 0 + isclosed || return false + data_idx = extract_index(@atomic crq.data_idx) + data_idx ≥ closed_info_idx || return false + antidata_idx = extract_index(@atomic crq.antidata_idx) + antidata_idx ≥ closed_info_idx || return false + # TODO: relaxed loads plus fence + max_idx = max(data_idx, antidata_idx) + for slot in crq.ring + (; index, safe) = slot + safe && index ≤ max_idx && return false + end + return true +end +=# + +@inline function nonnegative(x) + if assertion_enabled() + @assert x ≥ 0 + end + return x +end + +@inline function negative(x) + if assertion_enabled() + @assert x < 0 + end + return x +end + +@inline set_closing(i) = negative(ifelse(i < zero(i), i, typemin(i) + i)) +@inline extract_index(i) = nonnegative(ifelse(i ≥ zero(i), i, i - typemin(i))) + +@inline _mod1(i::Int32, crq) = _mod1log2(i, crq.log2len) +@inline _mod1log2(i::Int32, log2x) = + (UInt32(i - 1) & (typemax(UInt32) >> (32 - log2x))) % Int32 + Int32(1) + +_embed(::Type{UInt32}, x::SmallSigned) = _embed(UInt32, unsigned(x)) +_embed(::Type{UInt32}, x::SmallUnsigned) = (x % UInt32) + UInt32(1) +_extract(::Type{T}, x::UInt32) where {T<:SmallSigned} = signed(_extract(unsigned(T), x))::T +_extract(::Type{T}, x::UInt32) where {T<:SmallUnsigned} = (x - UInt32(1)) % T + +""" + denqueue!(crq::IMPCRQ{T}, x::T) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Waiter + denqueue!(crq::IMPCRQ{T}, x::Waiter{T}) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Some{T} + where MPCRQ = IndirectMultiPolarityConcurrentRingQueueNode +""" +function denqueue!( + crq::IndirectMultiPolarityConcurrentRingQueueNode{T}, + x::Union{T,Waiter{T}}, +) where {T} + nthreads = Int32(Threads.nthreads()) + + local polarity::Polarity = x isa Waiter{T} ? ANTIDATA : DATA + R = 2^crq.log2len + starvation_counter = 0 + while true + if x isa Waiter{T} + # p, _ = modifyfield!(crq, :antidata_idx, +, Int32(1), :sequentially_consistent) + p = atomic_modifyfield!(crq, Val(:antidata_idx), +, true)::Int32 + else + # p, _ = modifyfield!(crq, :data_idx, +, Int32(1), :sequentially_consistent) + p = atomic_modifyfield!(crq, Val(:data_idx), +, true)::Int32 + end + p_closing = p < 0 + p_idx = extract_index(p) + if p_closing + if discovered_closing(crq) ≤ p_idx + return MPCRQ_CLOSED + end + elseif p_idx ≥ MPCRQSLOT_MAX_INDEX - nthreads + discovered_closing(crq) + error("NOT WELL TESTED") + return MPCRQ_CLOSED + end + itemindex = _mod1(p_idx, crq) + slotptr = pointer(crq.ring, itemindex) + # ncas = 0 + while true + # ncas += 1 + # if ncas > 10000 + # let msg = "too many retires tid=$(Threads.threadid())" + # print(stderr, "$msg\n") + # error(msg) + # end + # end + # ccall(:jl_breakpoint, Cvoid, (Any,), slotptr) + slot = UnsafeAtomics.load(slotptr)::MPCRQSlot{UInt32} + (; index, safe, storage) = slot + if !iszero(storage) + if index == p_idx && slot.polarity !=′ polarity + threadindex = storage % Int + if x isa Waiter{T} + if inlinedata(crq) + y = _extract(T, storage) + else + y = crq.data[threadindex][itemindex] + end + else + y = crq.waiters[threadindex][itemindex] + end + + newslot = MPCRQSlot{UInt32}(; safe, index = p_idx + R, polarity) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + if x isa Waiter{T} + return Some(y) + else + return y + end + end + else + newslot = MPCRQSlot(; safe = false, index, slot.polarity, storage) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + break + end + end + # If no `break` or `return`, reload the slot. + else + # Getting here means slot is empty. Try to enqueue `x`. + if x isa Waiter{T} + q = @atomic crq.data_idx + else + q = @atomic crq.antidata_idx + end + q_idx = extract_index(q) + if safe # || q_idx ≤ p_idx + # if x isa Waiter{T} + # GC.safepoint() + # continue # spin + # end + threadindex = Threads.threadid() + storage = threadindex % UInt32 + if x isa Waiter{T} + crq.waiters[threadindex][itemindex] = x + else + if inlinedata(crq) + storage = _embed(UInt32, x) + else + crq.data[threadindex][itemindex] = x + end + end + + @assert !iszero(storage) + newslot = MPCRQSlot(; safe = true, index = p_idx, polarity, storage) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + # if slot.index > p_idx + # global CRQ = crq + # end + return MPCRQ_ENQUEUED + # else: reload the slot + end + else + break + end + end + GC.safepoint() # cannot use yield + end # of the CAS loop on the slot + # ccall(:jl_breakpoint, Cvoid, (Any,), nothing) + + # Reaching here means that a slot is successfully marked as unsafe. + # Check if the CRQ is full and detect starvation. + + # Load the opposite index: + if x isa Waiter{T} + q = @atomic crq.data_idx + else + q = @atomic crq.antidata_idx + end + q_idx = extract_index(q) + + starvation_counter += 1 + if (p_idx - q_idx ≥ R) || starvation_counter > CRQ_STARVATION + if x isa Waiter{T} + set_closing!(crq, Val(:antidata_idx)) + else + set_closing!(crq, Val(:data_idx)) + end + if discovered_closing(crq) ≤ p_idx + return MPCRQ_CLOSED + end + end + GC.safepoint() # cannot use yield + end +end + +# TODO: use bit-wise or? +function set_closing!( + crq::IndirectMultiPolarityConcurrentRingQueueNode, + ::Val{field}, +) where {field} + idx = getfield(crq, field, :sequentially_consistent) + idx < 0 && return idx + while true + new = set_closing(idx) + idx, success = replacefield!( + crq, + field, + idx, + new, + :sequentially_consistent, + :sequentially_consistent, + ) + success && return new + idx < 0 && return idx + end +end + +@noinline function discovered_closing(crq::IndirectMultiPolarityConcurrentRingQueueNode) + closed_info = @atomic crq.closed_info + closed_info_idx = extract_index(closed_info) + isclosed = closed_info < 0 + isclosed && return closed_info_idx + + antidata_idx = set_closing!(crq, Val(:antidata_idx)) + data_idx = set_closing!(crq, Val(:data_idx)) + + # @assert crq.antidata_idx < 0 + # @assert crq.data_idx < 0 + + new = set_closing(max(extract_index(data_idx), extract_index(antidata_idx))) + # @assert new < 0 + closed_info, success = @atomicreplace crq.closed_info closed_info => new + # @assert Implementations.isclosed(crq) + if success + return extract_index(new) + else + return extract_index(closed_info) + end +end + +mutable struct DualLinkedConcurrentRingQueue{ + T, + CRQ<:IndirectMultiPolarityConcurrentRingQueueNode{T}, +} + # _tag_pad::PadAfter32 + @atomic data::CRQ + _data_pad::PadAfter64 + @atomic antidata::CRQ + _antidata_pad::PadAfter64 + cache::ThreadLocalCache{CRQ} +end + +DualLinkedConcurrentRingQueue(; kwargs...) = DualLinkedConcurrentRingQueue{Any}(; kwargs...) +function DualLinkedConcurrentRingQueue{T}(; log2ringsize = 11) where {T} + node = IndirectMultiPolarityConcurrentRingQueueNode{T}(log2ringsize) + return DualLinkedConcurrentRingQueue( + # PadAfter32(), + node, + PadAfter64(), + node, + PadAfter64(), + ThreadLocalCache{typeof(node)}(), + )::DualLinkedConcurrentRingQueue{T} +end + +Base.eltype(::Type{<:DualLinkedConcurrentRingQueue{T}}) where {T} = T + +function Base.push!(lcrq::DualLinkedConcurrentRingQueue{T}, x) where {T} + x = convert(eltype(lcrq), x) + y = denqueue!(lcrq, x) + if y isa Waiter{T} + put!(y, x) + else + @assert y === MPCRQ_ENQUEUED + end + return lcrq +end + +function Base.popfirst!(lcrq::DualLinkedConcurrentRingQueue{T}) where {T} + w = Waiter{eltype(lcrq)}() + y = denqueue!(lcrq, w) + if y === MPCRQ_ENQUEUED + return fetch(w) + else + trash!(w) + return something(y::Some{T}) + end +end + +# const CLOSED_X = Vector{Int}[Int[] for _ in 1:Threads.nthreads()] + +""" + denqueue!(lcrq::DualLCRQ{T}, x::T) -> MPCRQ_ENQUEUED or y::Waiter + denqueue!(lcrq::DualLCRQ{T}, x::Waiter{T}) -> MPCRQ_ENQUEUED or y::Some{T} + where DualLCRQ = DualLinkedConcurrentRingQueue +""" +function denqueue!(lcrq::DualLinkedConcurrentRingQueue{T}, x::Union{T,Waiter{T}}) where {T} + while true + crq = if x isa Waiter{T} + @atomic lcrq.antidata + else + @atomic lcrq.data + end + y = denqueue!(crq, x) + y === MPCRQ_CLOSED || return y + + # crq is closed (but may NOT be empty) + next = @atomic crq.next + if next !== nothing + if x isa Waiter{T} + @atomicreplace lcrq.antidata crq => next + else + @atomicreplace lcrq.data crq => next + end + # success && isempty(crq) && cache_crq!(lcrq, crq) + else + newcrq = make_newcrq!(lcrq, crq) + # newcrq = similar(crq) + y = denqueue!(newcrq, x) + @assert y === MPCRQ_ENQUEUED + _, success = @atomicreplace crq.next nothing => newcrq + if success + if x isa Waiter{T} + @atomicreplace lcrq.antidata crq => newcrq + return y + else + @atomicreplace lcrq.data crq => newcrq + return y + end + else + cache_crq!(lcrq, newcrq) + end + end + GC.safepoint() # yield? + end +end + +function make_newcrq!(lcrq, crq) + oldcrq = maybepop!(lcrq.cache) + if oldcrq === nothing + return similar(crq) + else + return oldcrq + end +end + +function cache_crq!(lcrq::DualLinkedConcurrentRingQueue, crq) + trypush!(lcrq.cache, empty!(crq)) + return +end + +function nopop_foreach(f, crq::IndirectMultiPolarityConcurrentRingQueueNode) + i = extract_index(@atomic crq.antidata_idx) + while true + slot = crq.ring[_mod1(i, crq)] + (; index, polarity, storage) = slot + if index == i && polarity ==′ DATA + (; threadindex, itemindex) = storage + x = crq.data[threadindex][itemindex] + f(x) + i += one(i) + else + break + end + end +end + +function nopop_foreach(f, lcrq::DualLinkedConcurrentRingQueue) + crq = @atomic lcrq.antidata + while true + nopop_foreach(f, crq) + crq = @atomic crq.next + crq === nothing && return + end +end + +datalength(crq::IndirectMultiPolarityConcurrentRingQueueNode) = + extract_index(@atomic crq.data_idx) - extract_index(@atomic crq.antidata_idx) + +isclosed(crq::IndirectMultiPolarityConcurrentRingQueueNode) = (@atomic crq.closed_info) < 0 + +function Base.show(io::IO, ::MIME"text/plain", lcrq::DualLinkedConcurrentRingQueue) + data = @atomic lcrq.data + antidata = @atomic lcrq.antidata + if (@atomic antidata.next) === nothing && datalength(antidata) < 0 + node = data # head of antidata + polarity = ANTIDATA + else + node = antidata # head of data + polarity = DATA + end + n = 0 + ncrqs = 0 + while true + n += datalength(node) + ncrqs += 1 + node = @atomic node.next + node === nothing && break + end + if polarity == DATA + items = "$n data item" * (n > 1 ? "s" : "") + else + items = "$(-n) waiter" * (n < -1 ? "s" : "") + end + crqs = "$ncrqs node" * (ncrqs > 1 ? "s" : "") + print(io, "Dual LCRQ: $items in $crqs") +end + +function Base.show( + io::IO, + ::MIME"text/plain", + crq::IndirectMultiPolarityConcurrentRingQueueNode, +) + data_idx = extract_index(@atomic crq.data_idx) + antidata_idx = extract_index(@atomic crq.antidata_idx) + n = data_idx - antidata_idx + if n == 0 + items = "empty" + elseif n > 0 + items = "$n data item" * (n > 1 ? "s" : "") + else + items = "$(-n) waiter" * (n < -1 ? "s" : "") + end + status = isclosed(crq) ? "closed" : "open" + print( + io, + "MP-CRQ: $items (status: $status, data_idx: $data_idx, antidata_idx: $antidata_idx)", + ) +end + +function Base.NamedTuple(slot::MPCRQSlot) + (; index, safe, polarity, storage) = slot + return (; index, safe, polarity, storage) +end + +function Base.show(io::IO, ::MIME"text/plain", slot::MPCRQSlot) + if get(io, :typeinfo, Any) !== typeof(slot) + show(io, MIME"text/plain"(), typeof(slot)) + end + + (; index, safe, polarity, storage) = slot + nt = (; + index, + safe, + polarity = Text(string(polarity)), + storage = Text( + sprint( + show, + MIME"text/plain"(), + storage; + context = IOContext(io, :typeinfo => typeof(storage)), + ), + ), + ) + show(io, MIME"text/plain"(), nt) +end diff --git a/src/lcrq.jl b/src/lcrq.jl new file mode 100644 index 0000000..ee5fb3e --- /dev/null +++ b/src/lcrq.jl @@ -0,0 +1,340 @@ +struct Int32Bool + bits::Int32 +end + +Int32Bool(i::Int32, b::Bool) = Int32Bool(Int32(2b - 1) * abs(i)) + +int(x::Int32Bool) = abs(x.bits) +bool(x::Int32Bool) = x.bits > 0 + +setbool(x::Int32Bool, b::Bool) = Int32Bool(int(x), b) +setint(x::Int32Bool, i::Int32) = Int32Bool(i, bool(x)) + +struct ICRQIndex + bits::UInt32 +end + +const THREADINDEX_BITS32 = 8 +const ITEMINDEX_BITS32 = 32 - THREADINDEX_BITS32 + +function ICRQIndex(; threadindex, itemindex) + local bits::UInt32 + # threadindex ≥ 1 << THREADINDEX_BITS32 && error("threadindex too large $threadindex") + # itemindex ≥ 1 << ITEMINDEX_BITS32 && error("itemindex too large $itemindex") + bits = UInt32(threadindex) << ITEMINDEX_BITS32 + bits |= UInt32(itemindex) & (typemax(UInt32) >> THREADINDEX_BITS32) + return ICRQIndex(bits) +end + +@inline function Base.getproperty(idx::ICRQIndex, name::Symbol) + if name === :threadindex + bits = getfield(idx, :bits) + return (bits >> (32 - 8)) % Int32 + elseif name === :itemindex + bits = getfield(idx, :bits) + return (bits & (typemax(UInt32) >> 8)) % Int32 + else + return getfield(idx, name) + end +end + +struct CRQSlot{S} + index_safe::Int32Bool + storage::S # sizeof(S) ≤ 4 +end + +@inline CRQSlot(; index::Integer, safe::Bool, storage) = + CRQSlot(Int32Bool(Int32(index), safe), storage) + +@inline function Base.getproperty(slot::CRQSlot, name::Symbol) + if name === :index + return int(getfield(slot, :index_safe)) + elseif name === :safe + return bool(getfield(slot, :index_safe)) + end + return getfield(slot, name) +end + +""" + IndirectConcurrentRingQueueNode{T} + +Concurrent Ring Queue (CRQ) node which stores 64 bit slot in the `.ring`. Each +slot contains a 32 bit value that locates the actual value in the thread-local +`.buffers`; hence _indirect_. + +This approach wastes memory but it is (more or less) implementable without +atomics support of boxed element in array. + +TODO: Create a 128 bit slot "direct" version that directly points to a boxed +Julia value. +""" +mutable struct IndirectConcurrentRingQueueNode{T} + @atomic head::Int32 + _head_pad::PadAfter32 + @atomic tail_closed::Int32 + _tail_pad::PadAfter32 + @atomic next::Union{Nothing,IndirectConcurrentRingQueueNode{T}} + ring::Vector{CRQSlot{ICRQIndex}} + length::Int + buffers::Vector{Vector{T}} + buffertails::Vector{Int} +end +# TODO: pad + +function IndirectConcurrentRingQueueNode{T}(len::Int) where {T} + buffers = [Vector{T}(undef, 2len) for _ in 1:Threads.nthreads()] + buffertails = fill(1, Threads.nthreads()) + ring = [CRQSlot(Int32Bool(Int32(i), true), ICRQIndex(0)) for i in 1:len] + head = Int32(1) + tail_closed = Int32(1) + return IndirectConcurrentRingQueueNode( + head, + PadAfter32(), + tail_closed, + PadAfter32(), + nothing, + ring, + len, + buffers, + buffertails, + ) +end + +Base.eltype(::Type{<:IndirectConcurrentRingQueueNode{T}}) where {T} = T + +const CRQ_STARVATION = 128 # ??? + +function _close(crq::IndirectConcurrentRingQueueNode, old::Int32) + while old > 0 + new = typemin(Int32) + old + old, success = @atomicreplace crq.tail_closed old => new + success && return + end +end + +function tail_index(crq::IndirectConcurrentRingQueueNode, t = @atomic crq.tail_closed) + if t < 0 + t = t - typemin(Int32) + end + return t +end + +isclosed(crq::IndirectConcurrentRingQueueNode, t = @atomic crq.tail_closed) = t ≤ 0 + +function trypush!(crq::IndirectConcurrentRingQueueNode, x) + x = convert(eltype(crq), x) + + # TODO: create a Int64 version that that does not require this? + let t = @atomic crq.tail_closed + if isclosed(crq, t) + return false # closed + elseif t ≥ typemax(Int32) - Threads.nthreads() + _close(crq, t) + return false # closed + end + end + + starvation_ctr = 0 + while true + t = @atomic crq.tail_closed += true + if isclosed(crq, t) + return false # closed + elseif t ≥ typemax(Int32) - Threads.nthreads() + # Note: this condition is relying on that there are only at most + # `nthreads()` instances on `trypush!` calls at once; i.e., no yield + # point. + _close(crq, t) + return false # closed + end + slotptr = pointer(crq.ring, mod1(t, crq.length)) + slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex} + (; index, safe, storage) = slot + if iszero(storage.bits) + if (index ≤ t) && (safe || (@atomic crq.head) ≤ t) + tid = Threads.threadid() + buffer = crq.buffers[tid] + localtail = crq.buffertails[tid] + 1 + itemindex = mod1(localtail, length(buffer)) + buffer[itemindex] = x # [^itemindex] + bidx = ICRQIndex(; threadindex = tid, itemindex) + newslot = CRQSlot(; safe = true, index = t, storage = bidx) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + crq.buffertails[tid] = localtail + return true + end + end + end + starvation_ctr += 1 + if (t - (@atomic crq.head) ≥ crq.length) || starvation_ctr > CRQ_STARVATION + _close(crq, t) + return false # closed + end + GC.safepoint() # must not yield + end +end +# [^itemindex]: `buffer[itemindex] = x` is OK since there is no other thread +# that can write to this memory location. Using `length(ring) == +# length(buffer)` is OK since at this point we know that there is at least one +# empty slot and only other threads can make the `ring` full. However, other +# threads need to put the item in their own buffer. +# TODO: So, 2len -> len +# TODO: This reasoning was wrong in the Dual LCRQ. It may be due to calls to +# denqueue stalled right after FAI (handled with the `safe` bit). Check if +# something similar is possible in LCRQ and stop using `ICRQIndex` if so. + +function ConcurrentCollections.trypopfirst!(crq::IndirectConcurrentRingQueueNode) + while true + h = @atomic crq.head += true + slotptr = pointer(crq.ring, mod1(h, crq.length)) + while true + slot = UnsafeAtomics.load(slotptr)::CRQSlot{ICRQIndex} + (; index, safe, storage) = slot + if !iszero(storage.bits) + if index == h + (; threadindex, itemindex) = storage + x = crq.buffers[threadindex][itemindex] + # Above load requires "tearable atomics" for immutables; for + # boxed Julia objects (i.e., pointer), it's probably + # equivalent to asking the compiler to not root `x` until + # the CAS succeed so that there is an acquire ordering + # before reading the type tag. (That said, currently it may + # not be a problem since GC stops all tasks?) + + newslot = + CRQSlot(; safe, index = h + crq.length, storage = ICRQIndex(0)) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old === slot + return Some{eltype(crq)}(x) + end + else + newslot = CRQSlot(; safe = false, index, storage) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + break + end + end + else # empty slot + newslot = CRQSlot(; safe, index = index + crq.length, storage) + old = UnsafeAtomics.cas!(slotptr, slot, newslot) + if old == slot + break + end + end + GC.safepoint() + end + GC.safepoint() + + # Failed to dequeue. Bail out if empty. + if tail_index(crq) ≤ h + 1 + fixstate!(crq) + return nothing + end + end +end + +function fixstate!(crq::IndirectConcurrentRingQueueNode) + while true + h = @atomic crq.head + tail_closed = @atomic crq.tail_closed + t = tail_index(crq, tail_closed) + h ≤ t && return + isclosed(crq, tail_closed) && return # closed and empty + _, success = @atomicreplace crq.tail_closed tail_closed => h + success && return + end +end + +mutable struct LinkedConcurrentRingQueue{T} + @atomic head::IndirectConcurrentRingQueueNode{T} + @atomic tail::IndirectConcurrentRingQueueNode{T} +end + +# lcrq_default_ringsize() = 2^17 +lcrq_default_ringsize() = 2^10 +# lcrq_default_ringsize() = 2^5 + +LinkedConcurrentRingQueue(ringsize::Integer = lcrq_default_ringsize()) = + LinkedConcurrentRingQueue{Any}(ringsize) +function LinkedConcurrentRingQueue{T}(ringsize::Integer = lcrq_default_ringsize()) where {T} + node = IndirectConcurrentRingQueueNode{T}(ringsize) + return LinkedConcurrentRingQueue(node, node) +end + +Base.eltype(::Type{<:LinkedConcurrentRingQueue{T}}) where {T} = T + +const NEW_CRQ = Threads.Atomic{Int}(0) + +function Base.push!(lcrq::LinkedConcurrentRingQueue{T}, x) where {T} + x = convert(T, x) + while true + crq = @atomic lcrq.tail + next = @atomic crq.next + if next !== nothing + @atomicreplace lcrq.tail crq => next + continue + end + + trypush!(crq, x) && return lcrq + + Threads.atomic_add!(NEW_CRQ, 1) + newcrq = IndirectConcurrentRingQueueNode{T}(length(crq.ring)) + ok = trypush!(newcrq, x) + @assert ok + _, success = @atomicreplace crq.next nothing => newcrq + if success + @atomicreplace lcrq.tail crq => newcrq + return lcrq + end + end +end + +function ConcurrentCollections.trypopfirst!(lcrq::LinkedConcurrentRingQueue) + while true + crq = @atomic lcrq.head + x = trypopfirst!(crq) + x === nothing || return x + next = @atomic crq.next + next === nothing && return nothing + + x = trypopfirst!(crq) + x === nothing || return x + @atomicreplace lcrq.head crq => next + end +end + +function Base.show(io::IO, ::MIME"text/plain", lcrq::LinkedConcurrentRingQueue) + nitems = 0 + ncrqs = 0 + crq = @atomic lcrq.head + while crq !== nothing + h = @atomic crq.head + t = tail_index(crq) + nitems += max(0, t - h) + ncrqs += 1 + crq = @atomic crq.next + end + print(io, "LCRQ: $nitems item(s) in $ncrqs node(s)") +end + +function Base.show(io::IO, ::MIME"text/plain", crq::IndirectConcurrentRingQueueNode) + h = @atomic crq.head + t = tail_index(crq) + nitems = max(0, t - h) + status = isclosed(crq) ? "closed" : "open" + print(io, "CRQ: $nitems item(s) (status: $status, head: $h, tail: $t)") +end + +function Base.show(io::IO, ::MIME"text/plain", index::ICRQIndex) + if get(io, :typeinfo, Any) !== typeof(index) + invoke(show, Tuple{IO,MIME"text/plain",Any}, io, MIME"text/plain"(), index) + return + end + if iszero(index.bits) + print(io, "#null") + return + end + (; threadindex, itemindex) = index + nt = (; threadindex, itemindex) + show(io, MIME"text/plain"(), nt) +end diff --git a/src/misc.jl b/src/misc.jl new file mode 100644 index 0000000..43e7c46 --- /dev/null +++ b/src/misc.jl @@ -0,0 +1,5 @@ +function ConcurrentCollections.trypopfirst!(ch::Channel) + y = iterate(ch) + y === nothing && return nothing + return Some(first(y)) +end diff --git a/src/utils.jl b/src/utils.jl index de7e483..332b7b5 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -1,3 +1,8 @@ +==′(x::T, y::T) where {T} = x == y +!=′(x::T, y::T) where {T} = x != y + +assertion_enabled() = false + @noinline unreachable() = error("unreachable reached") @noinline unexpected(x) = error("unexpected value: $x") @@ -125,6 +130,26 @@ function from_bytes(::Type{T}, uint::UIntType) where {T,UIntType} end end +function UnsafeAtomics.load(p::Ptr{T}, ord::Val) where {T} + q = Ptr{uint_for(T)}(p) + uint = UnsafeAtomics.load(q, ord) + return from_bytes(T, uint) +end + +function UnsafeAtomics.store!(p::Ptr{T}, v::T, ord::Val) where {T} + uint = uint_from(v) + q = Ptr{typeof(uint)}(p) + UnsafeAtomics.store!(q, uint, ord) +end + +function UnsafeAtomics.cas!(p::Ptr{T}, cmp::T, new::T, so::Val, fo::Val) where {T} + ci = uint_from(cmp) + ni = uint_from(new) + q = Ptr{typeof(ci)}(p) + oi = UnsafeAtomics.cas!(q, ci, ni, so, fo) + return from_bytes(T, oi) +end + @inline isinlinable(::Type{Inlined{T}}) where {T} = isinlinable(T) @inline isinlinable(T::Type) = Base.isbitstype(T) || Base.isbitsunion(T) @@ -184,6 +209,36 @@ end end end +# Read /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size? +const CACHELINE_SIZE = 64 + +primitive type PadAfter64 448 end +PadAfter64() = Ref{PadAfter64}()[] + +mutable struct CheckPadAfter64 + a::UInt64 + pad::PadAfter64 + b::UInt64 +end +@assert fieldoffset(CheckPadAfter64, 3) == CACHELINE_SIZE + +const PadAfter32 = PadAfter64 + +mutable struct CheckPadAfter32 + a::UInt32 + pad::PadAfter32 + b::UInt32 +end +@assert fieldoffset(CheckPadAfter32, 3) == CACHELINE_SIZE + +function cacheline_padded_vector(::Type{T}, n::Integer) where {T} + cacheline = cld(sizeof(T), CACHELINE_SIZE) + xs = Vector{T}(undef, cacheline * (n + 1)) + ys = view(xs, cacheline+1:cacheline:length(xs)) + @assert length(ys) == n + return ys +end + function threaded_foreach(f, xs) y = iterate(xs) y === nothing && return @@ -232,7 +287,7 @@ function define_docstrings() stem, ext = splitext(filename) ext == ".md" || continue name = Symbol(stem) - name in names(ConcurrentCollections, all=true) || continue + name in names(ConcurrentCollections, all = true) || continue push!(docstrings, name => joinpath(docsdir, filename)) end for (name, path) in docstrings diff --git a/test/Project.toml b/test/Project.toml index 535a09b..c0ffb52 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -3,4 +3,5 @@ BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66" BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd" Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" +Formatting = "59287772-0a20-5a39-b81b-1366585eb4c0" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/test/test_dlcrq.jl b/test/test_dlcrq.jl new file mode 100644 index 0000000..de9bcb7 --- /dev/null +++ b/test/test_dlcrq.jl @@ -0,0 +1,143 @@ +module TestDLCRQ + +using Base.Experimental: @sync +using ConcurrentCollections +using ConcurrentCollections.Implementations: + MPCRQSlot, DATA, ANTIDATA, denqueue!, MPCRQ_ENQUEUED +using Test + +@testset "MPCRQSlot" begin + for index in [111, 222], + safe in [false, true], + polarity in [DATA, ANTIDATA], + storage in UInt32[0xaaa, 0xbbb] + + @test NamedTuple(MPCRQSlot(; index, safe, polarity, storage)) == + (; index, safe, polarity, storage) + end +end + +@testset "push-pop once" begin + q = DualLinkedConcurrentRingQueue{Int}() + push!(q, 111) + @test popfirst!(q) == 111 +end + +@testset "push-pop 100" begin + n = 100 + q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 3) + foldl(push!, 1:n; init = q) + ys = Int[] + for _ in 1:n + y = popfirst!(q) + push!(ys, y) + end + @test ys == 1:n +end + +@testset "push-pop 100 wait first" begin + n = 100 + q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 3) + task = Threads.@spawn begin + ys = Int[] + for _ in 1:n + y = popfirst!(q) + push!(ys, y) + end + return ys + end + sleep(0.01) + foldl(push!, 1:n; init = q) + ys = fetch(task) + @test ys == 1:n +end + +@testset "push-pop 100 inline" begin + n = 100 + q = DualLinkedConcurrentRingQueue{Int16}(; log2ringsize = 3) + @test q.data.data === nothing + foldl(push!, 1:n; init = q) + ys = Int[] + for _ in 1:n + y = popfirst!(q) + push!(ys, y) + end + @test ys == 1:n +end + +function unfair_sleep(seconds::Real) + t0 = time_ns() + ns = seconds * 1e9 + while time_ns() - t0 < ns + GC.safepoint() + end +end + +function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer) + received = Vector{Int}[] + tasks = Task[] + global TASKS = tasks + activesenders = Threads.Atomic{Int}(nsend) + @sync begin + for offset in 1:nsend + t = Threads.@spawn begin + for i in offset:nsend:nitems + push!(q, i) + end + if Threads.atomic_sub!(activesenders, 1) == 1 + for _ in 1:nrecv + push!(q, -1) + end + end + end + push!(tasks, t) + end + for _ in 1:nrecv + ys = Int[] + push!(received, ys) + t = Threads.@spawn begin + while true + y = popfirst!(q) + y == -1 && break + push!(ys, y) + end + end + push!(tasks, t) + end + end + return received +end + +function check_consecutive(xs) + notfound = Int[] + dups = Int[] + pre = xs[begin] - 1 + for x in xs + e = pre + 1 + append!(notfound, e:x-1) + append!(dups, x:e-1) + pre = x + end + return (; notfound, dups) +end + +@testset "concurrent push-pop" begin + @testset for trial in 1:100 + # @show trial + nsend = cld(Threads.nthreads(), 2) + nrecv = max(1, Threads.nthreads() - nsend) + q = DualLinkedConcurrentRingQueue{Int}(; log2ringsize = 5) + # q = Channel{Int}(Inf) + nitems = 2^20 + received = concurrent_push_pop!(q, nitems, nsend, nrecv) + allreceived = reduce(vcat, received) + sort!(allreceived) + (; notfound, dups) = check_consecutive(allreceived) + @test length(allreceived) == nitems + @test notfound == [] + @test dups == [] + @test allreceived == 1:nitems + end +end + +end # module diff --git a/test/test_lcrq.jl b/test/test_lcrq.jl new file mode 100644 index 0000000..c6da109 --- /dev/null +++ b/test/test_lcrq.jl @@ -0,0 +1,81 @@ +module TestLCRQ + +using ConcurrentCollections +using ConcurrentCollections.Implementations: ICRQIndex +using Test + +@testset "ICRQIndex" begin + idx = ICRQIndex(threadindex = 111, itemindex = 222) + @test idx.threadindex == 111 + @test idx.itemindex == 222 +end + +@testset "push-pop once" begin + q = LinkedConcurrentRingQueue{Int}() + push!(q, 111) + @test trypopfirst!(q) == Some(111) +end + +@testset "push-pop 100" begin + n = 100 + q = LinkedConcurrentRingQueue{Int}() + foldl(push!, 1:n; init = q) + ys = Int[] + while (y = trypopfirst!(q)) !== nothing + push!(ys, something(y)) + end + @test ys == 1:n +end + +function concurrent_push_pop!(q, nitems::Integer, nsend::Integer, nrecv::Integer) + received = Vector{Int}[] + @sync begin + for t in 1:nsend + Threads.@spawn begin + for i in t:nsend:nitems + push!(q, i) + end + end + end + for _ in 1:nrecv + ys = Int[] + push!(received, ys) + Threads.@spawn begin + while true + y = trypopfirst!(q) + if y === nothing + yield() + else + i = something(y) + push!(ys, i) + if i > nitems - nrecv + break + end + end + end + end + end + end + return received +end + +@testset "concurrent push-pop" begin + @test_broken false + #= + if Threads.nthreads() > 1 + nsend = cld(Threads.nthreads(), 2) + nrecv = Threads.nthreads() - nsend + @assert nsend ≥ 1 + @assert nrecv ≥ 1 + q = LinkedConcurrentRingQueue{Int}(32) + nitems = 2^20 + received = concurrent_push_pop!(q, nitems, nsend, nrecv) + allreceived = reduce(vcat, received) + @test length(allreceived) == nitems + sort!(allreceived) + @test allreceived == 1:nitems + end + =# +end + +end # module diff --git a/test/test_mpcrq.jl b/test/test_mpcrq.jl new file mode 100644 index 0000000..7df6482 --- /dev/null +++ b/test/test_mpcrq.jl @@ -0,0 +1,162 @@ +module TestMPCRQ + +using Base.Experimental: @sync +using ConcurrentCollections +using ConcurrentCollections.Implementations: + IndirectMultiPolarityConcurrentRingQueueNode, + MPCRQ_CLOSED, + MPCRQ_ENQUEUED, + Waiter, + denqueue! +using Test + +@testset "close" begin + crq = IndirectMultiPolarityConcurrentRingQueueNode{Int}(3) + @testset for i in 1:8 + @test denqueue!(crq, i) === MPCRQ_ENQUEUED + end + @test denqueue!(crq, 9) === MPCRQ_CLOSED + local ys, ylast + @sync begin + @async begin + ys = [denqueue!(crq, Waiter{Int}())::Some{Int} for _ in 1:8] + ylast = denqueue!(crq, Waiter{Int}()) + end + end + @test something.(ys) == 1:8 + @test ylast === MPCRQ_CLOSED +end + +function unfair_sleep(seconds::Real) + t0 = time_ns() + ns = seconds * 1e9 + while time_ns() - t0 < ns + GC.safepoint() + end +end + +function concurrent_denqueue!( + crq::IndirectMultiPolarityConcurrentRingQueueNode, + nitems::Integer, + nsend::Integer, + nrecv::Integer, +) + @assert nsend > 0 + @assert nrecv > 0 + use_yield = nsend + nrecv > Threads.nthreads() + # TODO: If `!use_yield`, make sure the tasks are spawned in different + # threads and they are sticky. + + received = Vector{Int}[] + senders = Task[] + receivers = Task[] + global SENDERS = senders + global RECEIVERS = receivers + ref = Threads.Atomic{Int}(0) + @sync begin + for offset in 1:nsend + t = Threads.@spawn begin + local y = nothing + local i = 0 + for outer i in offset:nsend:nitems + local s = Threads.atomic_add!(ref, 1) + if s > nsend + while ref[] > 1 + if use_yield + yield() + else + GC.safepoint() + end + end + end + local x = eltype(crq)(i) + y = denqueue!(crq, x) + y === MPCRQ_CLOSED && break + y === MPCRQ_ENQUEUED && continue + put!(y::Waiter, x) + end + return y, i + end + push!(senders, t) + end + for _ in 1:nrecv + ys = Int[] + push!(received, ys) + t = Threads.@spawn begin + local ys_nb = Int[] + local ys_b = Int[] + while true + local s = Threads.atomic_sub!(ref, 1) + if s < -nrecv + while ref[] < -1 + if use_yield + yield() + else + GC.safepoint() + end + end + end + local w = Waiter{eltype(crq)}() + local y = denqueue!(crq, w) + if y === MPCRQ_CLOSED + return (; y, ys_nb, ys_b) + end + local i + if y isa Some + i = something(y) + push!(ys, i) + push!(ys_nb, i) + else + @assert y === MPCRQ_ENQUEUED + i = fetch(w) + push!(ys, i) + push!(ys_b, i) + end + + if i > nitems - nrecv + return (; y, ys_nb, ys_b) + end + end + end + push!(receivers, t) + end + end + return received, senders, receivers +end + +function check_consecutive(xs) + notfound = Int[] + dups = Int[] + pre = xs[begin] - 1 + for x in xs + e = pre + 1 + append!(notfound, e:x-1) + append!(dups, x:e-1) + pre = x + end + return (; notfound, dups) +end + +@testset "concurrent push-pop" begin + @testset for trial in 1:100 + # @show trial + global received, notfound, dups, allreceived + nsend = cld(Threads.nthreads(), 2) + nrecv = max(1, Threads.nthreads() - nsend) + crq = IndirectMultiPolarityConcurrentRingQueueNode{Int}(7) + global CRQ = crq + nitems = 2^20 + received, senders, receivers = concurrent_denqueue!(crq, nitems, nsend, nrecv) + allreceived = reduce(vcat, received) + + @test length(allreceived) == nitems + sort!(allreceived) + (; notfound, dups) = check_consecutive(allreceived) + @test length(allreceived) == nitems + @test notfound == [] + @test dups == [] + @test allreceived == 1:nitems + end +end + +end # module