Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmark/ConcurrentCollectionsBenchmarks/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ version = "0.1.0"
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd"
Formatting = "59287772-0a20-5a39-b81b-1366585eb4c0"
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ module ConcurrentCollectionsBenchmarks

using BenchmarkTools: Benchmark, BenchmarkGroup

include("utils.jl")
include("bench_dict_histogram.jl")
include("bench_dict_get_existing.jl")
include("bench_dict_migration.jl")
include("bench_queue_pushpop.jl")
include("bench_queue_hot_potato.jl")

function setup()
suite = BenchmarkGroup()
suite["DictHistogram"] = BenchDictHistogram.setup()
suite["DictGetExisting"] = BenchDictGetExisting.setup()
suite["DictMigration"] = BenchDictMigration.setup()
suite["QueuePushPop"] = BenchQueuePushPop.setup()
suite["HotPotato"] = BenchQueueHotPotato.setup()
return suite
end

Expand Down Expand Up @@ -40,6 +45,8 @@ function clear()
BenchDictHistogram.clear()
BenchDictGetExisting.clear()
BenchDictMigration.clear()
BenchQueuePushPop.clear()
BenchQueueHotPotato.clear()
end

end # module
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
module BenchQueueHotPotato

using BenchmarkTools
using ConcurrentCollections
using ConcurrentCollections.Implementations: atomic_modifyfield!
using Formatting: format
using ..Utils: maptasks

const HOTPOTATO = false
const NOTPOTATO = true

const Stat = typeof((npush = 0, npop = 0, t1 = time_ns(), t2 = time_ns()))

Base.@kwdef struct HotPotatoResult
duration::Union{Float64,Nothing}
nrepeat::Int
stats::Vector{Stat}
end

function unfair_sleep(seconds::Real)
t0 = time_ns()
ns = seconds * 1e9
while time_ns() - t0 < ns
GC.safepoint()
# yield()
end
end

@noinline function hotpotato!(
q;
ntasks::Integer = Threads.nthreads(),
duration::Union{Real,Nothing} = nothing,
nrepeat::Integer = duration === nothing ? 2^15 : typemax(Int64),
)
ntasks < 1 && error("require positive `ntasks`: got $ntasks")
local tasks
push!(q, HOTPOTATO)
t0 = time_ns()
tasks = maptasks(1:ntasks) do itask
# Core.print("$itask-th task started\n")
local t1 = time_ns()
local npush = 0
local npop = 0
for irepeat in 1:nrepeat
# ccall(:jl_breakpoint, Cvoid, (Any,), (; irepeat, itask))
if rand(Bool)
push!(q, NOTPOTATO)
npush += 1
else
y = popfirst!(q)
npop += 1
if y == HOTPOTATO
# Since there are GC pauses, maybe there's no need
# to add sleep here.
# unfair_sleep(0.001)
# sleep(0.001)
push!(q, y)
npush += 1
end
end
if duration !== nothing
if (time_ns() - t0) / 1e9 > duration
break
end
end
end
return (; npush, npop, t1, t2 = time_ns())::Stat
end
return HotPotatoResult(; nrepeat, duration, stats = map(fetch, tasks))
end

mutable struct AtomicRef{T}
@atomic x::T
end

const FAIStat = typeof((; nfai = 0, t1 = time_ns(), t2 = time_ns()))

Base.@kwdef struct FAIResult
duration::Union{Float64,Nothing}
nrepeat::Int
refvalue::Int
stats::Vector{FAIStat}
end

function fai_stats(;
ntasks::Integer = Threads.nthreads(),
duration::Union{Real,Nothing} = nothing,
nrepeat::Integer = duration === nothing ? 2^15 : typemax(Int64),
impl::Val = Val(:threads),
)
ntasks < 1 && error("require positive `ntasks`: got $ntasks")
local tasks
ref = if impl === Val(:threads)
Threads.Atomic{Int32}(0)
else
AtomicRef{Int32}(0)
end
t0 = time_ns()
tasks = maptasks(1:ntasks) do itask
local t1 = time_ns()
local nfai = 0
for irepeat in 1:nrepeat
if impl === Val(:threads)
Threads.atomic_add!(ref, Int32(1))
elseif impl === Val(:unsafe)
atomic_modifyfield!(ref, Val(:x), +, Int32(1))
else
@atomic ref.x += true
end
nfai += 1
if duration !== nothing
if (time_ns() - t0) / 1e9 > duration
break
end
end
end
return (; nfai, t1, t2 = time_ns())::FAIStat
end
if impl === Val(:threads)
refvalue = ref[]
else
refvalue = @atomic ref.x
end
return FAIResult(; nrepeat, duration, refvalue, stats = map(fetch, tasks))
end

function setup(; kwargs...)
suite = BenchmarkGroup()
suite["channel"] = @benchmarkable hotpotato!(Channel{Bool}(Inf); $kwargs...)
suite["dlcrq"] =
@benchmarkable hotpotato!(DualLinkedConcurrentRingQueue{Bool}(); $kwargs...)
return suite
end

function clear() end

function summarize(result::HotPotatoResult)
npush = sum(stat.npush for stat in result.stats)
npop = sum(stat.npop for stat in result.stats)
t1min = minimum(stat.t1 for stat in result.stats)
t2max = maximum(stat.t2 for stat in result.stats)
duration = (t2max - t1min) / 1e9
throughput = (npush + npop) / duration
return (; npush, npop, duration, throughput)
end

function Base.show(io::IO, ::MIME"text/plain", result::HotPotatoResult)
(; npush, npop, duration, throughput) = summarize(result)
println(io, "Hot potato benchmark with ", length(result.stats), " tasks")
println(
io,
format(npush; commas = true),
" pushes and ",
format(npop; commas = true),
" pops in ",
duration,
" seconds",
)
print(
io,
"throughput = ",
format(floor(Int, throughput); commas = true),
" ops/seconds",
)
end

function summarize(result::FAIResult)
nfai = sum(stat.nfai for stat in result.stats)
t1min = minimum(stat.t1 for stat in result.stats)
t2max = maximum(stat.t2 for stat in result.stats)
duration = (t2max - t1min) / 1e9
throughput = nfai / duration
return (; nfai, duration, throughput)
end

function Base.show(io::IO, ::MIME"text/plain", result::FAIResult)
(; nfai, duration, throughput) = summarize(result)
println(io, "FAI benchmark with ", length(result.stats), " tasks")
println(io, format(nfai; commas = true), " FAIs in ", duration, " seconds")
print(
io,
"throughput = ",
format(floor(Int, throughput); commas = true),
" ops/seconds",
)
end

function timings(result::HotPotatoResult)
t1min = minimum(stat.t1 for stat in result.stats)
table =
Iterators.map(enumerate(result.stats)) do (taskid, stat)
(; t1, t2) = stat
return (
(; t = (t1 - t1min) / 1e9, event = :begin, taskid),
(; t = (t2 - t1min) / 1e9, event = :end, taskid),
)
end |>
Iterators.flatten |>
collect
sort!(table; by = row -> row.t)
return table
end

end # module
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module BenchQueuePushPop

using BenchmarkTools
using ConcurrentCollections

function pushpop!(
q;
nrepeat::Integer = 2^15,
nitems::Integer = 11,
ntasks::Integer = Threads.nthreads(),
nspins::Integer = 100,
)
ntasks < 1 && error("require positive `ntasks`: got $ntasks")
local tasks
@sync begin
tasks = map(1:ntasks) do _
Threads.@spawn begin
nyields = 0
for _ in 1:nrepeat
for i in 1:nitems
push!(q, i)
end
for _ in 1:nitems
while true
y = nothing
for _ in 1:nspins
y = trypopfirst!(q)
y === nothing || break
end
y === nothing || break
yield()
nyields += 1
if nyields > 2^20
@error "Too many yields!"
error("Too many yields!")
end
end
end
end
return nyields
end
end
end
return (; nrepeat, nitems, nyields = map(fetch, tasks))
end

function setup(; kwargs...)
suite = BenchmarkGroup()
suite["channel"] = @benchmarkable pushpop!(Channel{Int}(Inf); $kwargs...)
suite["msqueue"] = @benchmarkable pushpop!(ConcurrentQueue{Int}(); $kwargs...)
suite["lcrq"] = @benchmarkable pushpop!(LinkedConcurrentRingQueue{Int}(); $kwargs...)
return suite
end

function clear() end

end # module
15 changes: 15 additions & 0 deletions benchmark/ConcurrentCollectionsBenchmarks/src/utils.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Utils

function maptasks(f, xs)
tasks = Task[]
for (tid, x) in enumerate(xs)
t = @task f(x)
t.sticky = false
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, mod1(tid, Threads.nthreads()) - 1)
schedule(t)
push!(tasks, t)
end
return tasks
end

end # module
1 change: 1 addition & 0 deletions benchmark/hotpotato/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/build
Loading