diff --git a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_dict_migration.jl b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_dict_migration.jl index 4c0dbef..0ad3f09 100644 --- a/benchmark/ConcurrentCollectionsBenchmarks/src/bench_dict_migration.jl +++ b/benchmark/ConcurrentCollectionsBenchmarks/src/bench_dict_migration.jl @@ -3,7 +3,7 @@ module BenchDictMigration using BenchmarkTools using ConcurrentCollections using ConcurrentCollections.Implementations: - LINEAR_PROBING_DICT_EXPAND_BASESIZE, migrate_serial!, new_slots_and_pairnodes + LINEAR_PROBING_DICT_EXPAND_BASESIZE, migrate_serial! pad16(x) = string(x; pad = 16) @@ -28,13 +28,11 @@ function setup(; generate_options...) for key in keys(CACHE[]) CacheType = typeof(CACHE[][key]) suite[key] = @benchmarkable( - migrate_serial!(newslots, newpairnodes, slots, pairnodes), + migrate_serial!(newslots, slots), setup = begin dict = CACHE[][$key]::$CacheType slots = copy(dict.slots) - pairnodes = copy(dict.pairnodes) - newslots, newpairnodes = - new_slots_and_pairnodes(slots, pairnodes, true) + newslots = similar(slots, length(slots) * 2) end, evals = 1, ) diff --git a/src/ConcurrentCollections.jl b/src/ConcurrentCollections.jl index 9611c38..24a0d11 100644 --- a/src/ConcurrentCollections.jl +++ b/src/ConcurrentCollections.jl @@ -63,6 +63,7 @@ using .UnsafeAtomics: acq_rel, acquire, monotonic, release, seq_cst, unordered include("utils.jl") include("cache.jl") include("atomicsutils.jl") +include("promise.jl") include("dict.jl") include("workstealing.jl") include("msqueue.jl") diff --git a/src/dict.jl b/src/dict.jl index 9c8a329..318e760 100644 --- a/src/dict.jl +++ b/src/dict.jl @@ -1,94 +1,72 @@ -@enum LPDKeyState::UInt8 LPD_EMPTY LPD_DELETED LPD_MOVED_EMPTY LPD_MOVED LPD_HASKEY -const LPD_NBITS = ceil(Int, log2(maximum(Int.(instances(LPDKeyState))) + 1)) -const LPD_BITMASK = ~(typemax(UInt8) << LPD_NBITS) +struct Empty end -struct KeyInfo{T<:Union{UInt32,UInt64}} - bits::T -end - -@inline function LPDKeyState(ki::KeyInfo{T}) where {T} - bits = getfield(ki, :bits) - statebits = bits & T(LPD_BITMASK) - state = statebits % UInt8 - if assertion_enabled() - return LPDKeyState(state) - end - if state <= UInt8(LPD_HASKEY) - return LPDKeyState(state) - else - return LPD_HASKEY - end -end - -Base.zero(::Type{KeyInfo{T}}) where {T} = KeyInfo{T}(zero(T)) - -KeyInfo{T}(state::LPDKeyState, keydata::T) where {T} = - setstate(KeyInfo(keydata << LPD_NBITS), state) - -@inline function Base.getproperty(ki::KeyInfo, name::Symbol) - bits = getfield(ki, :bits) - if name === :state - return LPDKeyState(ki) - elseif name === :isempty - if assertion_enabled() - @assert iszero(bits) == (LPDKeyState(ki) === LPD_EMPTY) - end - return iszero(bits) - elseif name === :isdeleted - return LPDKeyState(ki) === LPD_DELETED - elseif name === :ismovedempty - return LPDKeyState(ki) === LPD_MOVED_EMPTY - elseif name === :ismoved - return LPDKeyState(ki) === LPD_MOVED - elseif name === :haskey - return LPDKeyState(ki) === LPD_HASKEY - elseif name === :keydata - return bits >> LPD_NBITS - end - return getfield(ki, name) -end - -@inline function setstate(ki::KeyInfo{T}, state::LPDKeyState) where {T} - bits = getfield(ki, :bits) - if state === LPD_EMPTY - return KeyInfo(zero(bits)) - else - return KeyInfo((bits & ~T(LPD_BITMASK)) | UInt8(state)) - end -end - -@inline function setdata(ki::KeyInfo{T}, keydata::T) where {T} - bits = getfield(ki, :bits) - state = bits & T(LPD_BITMASK) - databits = keydata << LPD_NBITS - return KeyInfo(databits | state) -end - -mutable struct AtomicRef{T} - @atomic value::Union{Nothing,T} -end -AtomicRef{T}() where {T} = AtomicRef{T}(nothing) -Base.eltype(::Type{AtomicRef{T}}) where {T} = T +# TODO: Rename to `Moving`? +""" + Moved{Value} -mutable struct PairNode{Key,Value} - slotid::UInt64 - key::Key - @atomic value::Value - @atomic next::Union{PairNode{Key,Value},Nothing} +A "tag" used for making that this slot is being moved to a new `slots` vector. +""" +struct Moved{Value} + value::Value end -mutable struct LinearProbingDict{Key,Value} <: ConcurrentDict{Key,Value} - # TODO: Use `Vector{UInt128}` - @atomic slots::Union{Vector{UInt64},Nothing} - # TODO: Use `PairNode{Key,Value}` if `Value` is too large? - @atomic pairnodes::Vector{AtomicRef{PairNode{Key,Value}}} - slotids::typeof(cacheline_padded_vector(UInt64, 1)) +const MOVED_EMPTY = Moved{Empty}(Empty()) + +# TODO: Support "UInt62"? (i.e., `UInt64` but two bits used as tag) +# TODO: Get rid of Moved{Value} +const ValueUnion{Value} = Union{Value,Empty,Moved{Value},Moved{Empty}} + +# Since Julia does not have atomics for arrays, `LinearProbingDict` uses an +# array filled with mutable struct `DictSlot` instead. This has the disadvantage +# that it creates one more indirection. But a somewhat nice effect is that it +# makes copying a key-value pair cheap and easy to do with non-atomic write. +# Re-using small mutable objects like this can be bad for memory locality but it +# seems that the effect is neglegible. The performance is much better than +# creating small objects and copy the key and value separately. +# +# Due to the type tag, storing two UInt64 takes space as large as storing three +# UInt64. So, we can store hash for "free". +# TODO: Check if it is actually better to store the hash in an `Vector{UInt}`. +# Even though it will waste some memory, it will reduce indirection and should +# make probing much more efficient. +mutable struct DictSlot{ + Key, + Value, + KeyStorage>:Union{Key,Empty}, + ValueStorage>:ValueUnion{Value}, +} + @atomic hash::UInt + @atomic key::KeyStorage + @atomic value::ValueStorage + + # Defining constructor manually to workaround: + # https://github.com/JuliaLang/julia/issues/42353 + DictSlot{Key,Value,KeyStorage,ValueStorage}( + hash, + key, + value, + ) where {Key,Value,KeyStorage,ValueStorage} = + new{Key,Value,KeyStorage,ValueStorage}(hash, key, value) +end + +DictSlot{Key,Value,KeyStorage,ValueStorage}() where {Key,Value,KeyStorage,ValueStorage} = + DictSlot{Key,Value,KeyStorage,ValueStorage}(zero(UInt), Empty(), Empty()) + +mutable struct LinearProbingDict{Key,Value,KeyStorage,ValueStorage} <: + ConcurrentDict{Key,Value} + @atomic slots::Union{Vector{DictSlot{Key,Value,KeyStorage,ValueStorage}},Nothing} migration::ReentrantLock # TODO: per-thread non-atomic counter for approximating deleted elements nadded::Threads.Atomic{Int} ndeleted::Threads.Atomic{Int} end # TODO: inline the counters (and pad them)? +# TODO: relaxed counter + +slots_type( + ::LinearProbingDict{Key,Value,KeyStorage,ValueStorage}, +) where {Key,Value,KeyStorage,ValueStorage} = + Vector{DictSlot{Key,Value,KeyStorage,ValueStorage}} ConcurrentCollections.length_upper_bound(dict::LinearProbingDict) = dict.nadded[] - dict.ndeleted[] @@ -99,105 +77,40 @@ ConcurrentCollections.length_lower_bound(dict::LinearProbingDict) = ConcurrentCollections.ConcurrentDict{Key,Value}() where {Key,Value} = LinearProbingDict{Key,Value}() +# Extracted as a function so that `T` is captured as a type parameter in the +# closure for the generator. +fillwith(::Type{T}, n) where {T} = [T() for _ in 1:n] + function LinearProbingDict{Key,Value}() where {Key,Value} capacity = 4 # TODO: customizable init size? - # TODO: handle the case where key, value, and the metadata fits in an UInt - # TODO: check the availability of CAS2? - # TODO: use `PairNode{Key,Any}` if `Value` is too large - pairnodes = [AtomicRef{PairNode{Key,Value}}() for _ in 1:capacity] - slots = zeros(UInt64, 2 * capacity) - slotids = cacheline_padded_vector(UInt64, Threads.nthreads()) - slotids .= eachindex(slotids) - return LinearProbingDict{Key,Value}( + if sizeof(Some{Union{Key,Empty}}) > sizeof(UInt) + KeyStorage = Any + else + KeyStorage = Union{Key,Empty} + end + if sizeof(Some{ValueUnion{Value}}) > sizeof(UInt) + ValueStorage = Any + else + ValueStorage = ValueUnion{Value} + end + slots = fillwith(DictSlot{Key,Value,KeyStorage,ValueStorage}, capacity) + return LinearProbingDict{Key,Value,KeyStorage,ValueStorage}( slots, - pairnodes, - slotids, ReentrantLock(), Threads.Atomic{Int}(0), Threads.Atomic{Int}(0), ) end -@inline function prepare_pairnode!( - pairnodes::Vector{AtomicRef{Node}}, - index, - slotid, - key, - value, -) where {Node<:PairNode} - # node = Node(slotid, key, value, nothing) - ref = pairnodes[index] - head = @atomic ref.value - while true - # Allocating `Node` each time instead of `@atomic node.next = head` - # below is better. It looks like avoiding `@atomic` and optimizing for - # the happy case is better for the performance? - node = Node(slotid, key, value, head) - # @atomic node.next = head - head, success = @atomicreplace ref.value head => node - success && return - end -end - -@inline function cleanup_pairnode!(slots, pairnodes, index) - GC.@preserve slots begin - s2ptr = pointer(slots, 2 * index) - slot2 = UnsafeAtomics.load(s2ptr) - slotid = slotid_from_slot2(slot2) - end - iszero(slotid) && unreachable() - ref = pairnodes[index] - node = (@atomic ref.value)::PairNode - while true - node.slotid == slotid && break - next = @atomic node.next - old, success = @atomicreplace ref.value node => next - if success - node = next::PairNode - else - node = old::PairNode - end - end - next = @atomic node.next - if next !== nothing - @atomicreplace node.next next => nothing - end - return -end - -@inline function load_pairnode(pairnodes, index, slotid) - ref = pairnodes[index] - node = (@atomic ref.value)::PairNode - while true - node.slotid == slotid && return node - node = (@atomic node.next)::PairNode - end -end - -mutable struct ValueRef{Value,Node<:PairNode} - node::Node - isloaded::Bool +struct ValueRef{Value} value::Value - ValueRef{Value}(node::Node) where {Value,Node} = new{Value,Node}(node, false) end # Note on `modify!` design: It looks like (even relaxed) atomic load is not # eliminated when the value is not used (). # So, let's pass a `Ref`-like object to `modify!` and so that load is not issued # when the user does not look at the value. -@inline function Base.getindex(ref::ValueRef) - if !ref.isloaded - node = ref.node - ref.value = @atomic node.value - ref.isloaded = true - end - return ref.value -end - -@inline function Base.setindex!(ref::ValueRef, x) - ref.value = x - ref.isloaded = true -end +@inline Base.getindex(ref::ValueRef) = ref.value function Base.getindex(d::LinearProbingDict{Key}, key) where {Key} y = tryget(d, key) @@ -218,14 +131,14 @@ end Base.get(d::LinearProbingDict, key, default) = something(ConcurrentCollections.tryget(d, key), default) -function ConcurrentCollections.tryget(d::LinearProbingDict, key) +function ConcurrentCollections.tryget(d::LinearProbingDict{<:Any,V}, key) where {V} @inline f(::Nothing) = nothing @inline f(x) = Keep(x[]) y = modify!(f, d, key) if y === nothing return nothing else - return Some(y.value) + return Some{V}(y.value::V) end end @@ -254,227 +167,199 @@ function ConcurrentCollections.trypop!(d::LinearProbingDict, key) end end -@inline slotid_from_slot2(slot2::UInt64) = slot2 & (typemax(UInt64) >> LPD_NBITS) +function record_added!(dict) + Threads.atomic_add!(dict.nadded, 1) + 1 +end -@inline function reconstruct_full_hash(keyinfo::KeyInfo, slot2::UInt64) - hash2 = slot2 & ~(typemax(UInt64) >> LPD_NBITS) - return (keyinfo.keydata << LPD_NBITS) | (hash2 >> (64 - LPD_NBITS)) +function record_deleted!(dict) + Threads.atomic_add!(dict.ndeleted, 1) + 1 end +# The main linearization points are the change in the value, not the key. The +# CAS on key only determines the internal "physical" location of the slot. function ConcurrentCollections.modify!( f, dict::LinearProbingDict{Key,Value}, key, ) where {Key,Value} key = convert(Key, key) - slots, pairnodes = slots_and_pairnodes(dict) - newslotid = UInt64(0) - + slots = load_slots(dict) h = hash(key) - # The upper and lower bits of hash: - hash1 = h >> LPD_NBITS # stored in slot 1 (`keyinfo.keydata`) - hash2 = h << (64 - LPD_NBITS) # stored in slot 2 - - if 4 * length_upper_bound(dict) > length(slots) - slots, pairnodes = expand!(dict, slots, pairnodes) + if 1.5 * length_upper_bound(dict) > length(slots) + slots = expand!(dict, slots) end while true - c = length(slots) ÷ 2 + @label restart + c = length(slots) offset = reinterpret(Int, h) & (c - 1) # h % c nprobes = 0 - GC.@preserve slots begin + while true + index = offset + 1 + offset = (offset + 1) & (c - 1) # (offset + 1) % c + + ref = slots[index] + hstored = @atomic :monotonic ref.hash + if (!iszero(hstored)) & (h != hstored) + nprobes += 1 + if nprobes > c ÷ 4 + @goto too_many_probes + end + continue # already occupied; continue probing + end + # It's possible to reach here even when the key matches (i.e., the + # maching key has been stored but the hash has not). This is fine + # since the key is compared directly with `isequal` below anyway. + + kstored = (@atomic ref.key)::Union{Empty,Key} + if kstored isa Empty + reply = f(nothing)::Union{Nothing,Some} + reply === nothing && return nothing + oldkey, ok = @atomicreplace ref.key Empty() => key + if ok + @atomic :monotonic ref.hash = h + else + hstored = @atomic :monotonic ref.hash + if (!iszero(hstored) & (h != hstored)) || !isequal(oldkey, key) + continue # failed to obtain this slot; continue probing + end + # Reaching here means two tasks simultaneously inserted the + # same key. The value must be inserted to this slot. + end + v = convert(Value, something(reply)) + vstored, ok = @atomicreplace ref.value Empty() => v + if ok + record_added!(dict) + return reply + end + vstored = vstored::Union{Value,Moved{Value},Moved{Empty}} + elseif isequal(kstored, key) + vstored = (@atomic ref.value)::ValueUnion{Value} + else + continue # already occupied; continue probing + end + # Here, we know that this is the correct slot but the value is not + # stored yet. while true - index = offset + 1 - s1ptr = pointer(slots, 2 * offset + 1) - s2ptr = pointer(slots, 2 * offset + 2) - keybits = UnsafeAtomics.load(s1ptr) - keyinfo = KeyInfo(keybits) - - if keyinfo.ismoved || keyinfo.ismovedempty - slots, pairnodes = finishmove!(dict, slots, pairnodes) - break # restart - elseif keyinfo.isempty + if vstored isa Empty reply = f(nothing)::Union{Nothing,Some} - reply === nothing && return reply - # Insertion: - if iszero(newslotid) - newslotid = dict.slotids[Threads.threadid()] += Threads.nthreads() - # TODO: Handle wrap-around of slotid? Reset it during migration? - end - prepare_pairnode!(pairnodes, index, newslotid, key, something(reply)) - slot2 = hash2 | newslotid - oldslot = Pair(keybits, zero(keybits)) - newslot = Pair(KeyInfo{UInt64}(LPD_HASKEY, hash1).bits, slot2) - s12ptr = Ptr{typeof(oldslot)}(s1ptr) - found = UnsafeAtomics.cas!(s12ptr, oldslot, newslot) - if found === oldslot - Threads.atomic_add!(dict.nadded, 1) + reply === nothing && return nothing + v = convert(Value, something(reply)) + vstored, ok = @atomicreplace ref.value Empty() => v + if ok + record_added!(dict) return reply end - foundinfo = KeyInfo(first(found)) - if foundinfo.ismoved | foundinfo.ismovedempty - slots, pairnodes = finishmove!(dict, slots, pairnodes) - break # restart - else - # Failed to insert a new entry. It means that there was - # another task successfully inserted a new slot. The - # linked list in `pairnodes[index]` needs cleanup now - # before continue probing. - cleanup_pairnode!(slots, pairnodes, index) - - # Retrying on CAS failure since this key may be inserted - # by another task. - continue - # TODO: Check the hash in `found`? If different, there's - # no need to retry. - end - elseif keyinfo.haskey - if keyinfo.keydata ==′ hash1 - slot2 = UnsafeAtomics.load(s2ptr) - slotid = slotid_from_slot2(slot2) - stored_hash = reconstruct_full_hash(keyinfo, slot2) - node = load_pairnode(pairnodes, index, slotid) - if stored_hash == h && isequal(node.key, key) - vref = ValueRef{Value}(node) - while true - reply = f(vref)::Union{Keep,Nothing,Delete,Some} - reply isa Keep && return reply - reply isa Union{Nothing,Delete} && break - # Update: - old = vref[] - new = something(reply::Some) - old, success = @atomicreplace node.value old => new - success && return reply - vref[] = old - end - - # Deletion: - oldslot = Pair(keybits, slot2) - newslot = Pair(setstate(keyinfo, LPD_DELETED).bits, slot2) - s12ptr = Ptr{typeof(oldslot)}(s1ptr) - if UnsafeAtomics.cas!(s12ptr, oldslot, newslot) === oldslot - ndeleted = Threads.atomic_add!(dict.ndeleted, 1) + 1 - approx_len = dict.nadded[] - ndeleted - half_len = length(slots) ÷ 4 - if length(slots) > 8 && approx_len < half_len - shrink!(dict, slots, pairnodes) - end - return reply - else - continue # CAS failed; retry - end + vstored = vstored::Union{Value,Moved{Value},Moved{Empty}} + end + if vstored isa Moved + slots = finishmove!(dict, slots) + @goto restart + end + vref = ValueRef{Value}(vstored) + reply = f(vref)::Union{Keep,Nothing,Delete,Some} + if reply isa Keep + # @show nprobes + return reply + end + if reply isa Union{Nothing,Delete} + # Deletion: + vstored, ok = @atomicreplace ref.value vstored => Empty() + if ok + ndeleted = record_deleted!(dict) + approx_len = dict.nadded[] - ndeleted + half_len = length(slots) ÷ 2 + if length(slots) > 8 && approx_len < half_len + shrink!(dict, slots) end + return reply end - # Key doesn't match => continue probing - elseif keyinfo.isdeleted - # => continue probing else - unexpected(keyinfo) + # Update: + new = convert(Value, something(reply::Some)) + vstored, ok = @atomicreplace ref.value vstored => new + ok && return reply end + vstored = vstored::ValueUnion{Value} + end - nprobes += 1 - if nprobes > c ÷ 4 - let newslots = @atomic dict.slots - # Nonblocking check to see if the slots are migrated: - if slots === newslots - # @info "expand: length(slots) ≈ 2^$(floor(Int, log2(length(slots))))" - # global DICT = dict - # TODO: Check the approximated table size here. It's - # possible that the hash table needs cleanup but not - # resize (i.e., too many deleted slots). - slots, pairnodes = expand!(dict, slots, pairnodes) - else - slots, pairnodes = slots_and_pairnodes(dict) - end + nprobes += 1 + if nprobes > c ÷ 4 + @label too_many_probes + let newslots = @atomic dict.slots + if (slots === newslots) || newslots === nothing + # @info "expand: length(slots) ≈ 2^$(floor(Int, log2(length(slots))))" + # global DICT = dict + # TODO: Check the approximated table size here. It's + # possible that the hash table needs cleanup but not + # resize (i.e., too many deleted slots). + slots = expand!(dict, slots) + else + slots = load_slots(dict) end - break # restart end - - offset = (offset + 1) & (c - 1) # (offset + 1) % c + @goto restart end - end - end + end # ...of the inner "CAS loop" + end # ...of the restart loop end -expand!(dict, oldslots, oldpairnodes) = migrate!(dict, oldslots, oldpairnodes, true) -shrink!(dict, oldslots, oldpairnodes) = migrate!(dict, oldslots, oldpairnodes, false) +expand!(dict, oldslots) = migrate!(dict, oldslots, true) +shrink!(dict, oldslots) = migrate!(dict, oldslots, false) -function new_slots_and_pairnodes(slots, pairnodes, expand) - newslots = zeros(eltype(slots), expand ? length(slots) * 2 : length(slots) ÷ 2) - # newslots = Mmap.mmap(Vector{UInt64}, expand ? length(slots) * 2 : length(slots) ÷ 2) - newpairnodes = [eltype(pairnodes)() for _ in 1:length(newslots)÷2] - # TODO: Can refs (and not just nodes) be reused? - return (newslots, newpairnodes) +function migrate!(dict::LinearProbingDict, expand::Bool; basesize = nothing) + slots = load_slots(dict) + return migrate!(dict, slots, expand; basesize) end -function migrate!(dict::LinearProbingDict, expand::Bool; basesize = nothing) - slots, pairnodes = slots_and_pairnodes(dict) - return migrate!(dict, slots, pairnodes, expand; basesize) +function load_slots(dict) + while true + slots = @atomic dict.slots + if slots === nothing + return finishmove!(dict, slots) + else + return slots + end + end end -function migrate!(dict, oldslots, oldpairnodes, expand; basesize = nothing) +function migrate!(dict::LinearProbingDict, oldslots, expand; basesize = nothing) # Since the migration is parallelized, workers running tasks blocked by the # lock actually will contribute to the forward progress of the entire # system. (Though the OS may suspend this worker thread before the tasks are # spawned.) lock(dict.migration) do - slots = (@atomic dict.slots)::Vector{UInt64} - pairnodes = @atomic dict.pairnodes + slots = (@atomic dict.slots)::slots_type(dict) if slots !== oldslots - return slots, pairnodes + return slots end @atomic dict.slots = nothing - @assert pairnodes === oldpairnodes - (newslots, newpairnodes) = new_slots_and_pairnodes(slots, pairnodes, expand) + newslots = similar(slots, expand ? length(slots) * 2 : length(slots) ÷ 2) if expand - nadded = expand_parallel!(newslots, newpairnodes, slots, pairnodes, basesize) + expand_parallel!(newslots, slots, basesize) else - nadded = migrate_serial!(newslots, newpairnodes, slots, pairnodes) + migrate_serial!(newslots, slots) end # TODO: parallelize `shrink!` - # At this point, no other thread can be mutating the counters (as they - # will observe `Moved`). Thus, it is safe to update the counter - # non-atomically: - dict.ndeleted[] = 0 - dict.nadded[] = nadded - # This is the atomic "publishing" operation that makes the `newslots` # accessible to any tasks (including the ones that are/were not trying # to acquire the `migration` lock). - @atomic dict.pairnodes = newpairnodes @atomic dict.slots = newslots - return newslots, newpairnodes + return newslots end end -function finishmove!(dict, oldslots, oldpairnodes) +function finishmove!(dict::LinearProbingDict, oldslots) lock(dict.migration) do - slots = (@atomic dict.slots)::Vector{UInt64} - pairnodes = @atomic dict.pairnodes + slots = (@atomic dict.slots)::slots_type(dict) # The caller observed `Moved` which only sets inside the `migration` # lock. Thus, the migration should be finished once this lock is # acquired: @assert oldslots !== slots - @assert oldpairnodes !== pairnodes - return slots, pairnodes - end -end - -function slots_and_pairnodes(dict) - while true - pairnodes = @atomic dict.pairnodes - slots = @atomic dict.slots - if slots === nothing - return finishmove!(dict, slots, pairnodes) - else - if pairnodes === @atomic dict.pairnodes - return (slots, pairnodes) - end - end + return slots end end @@ -484,7 +369,7 @@ struct Stopped end """ - expand_parallel_basecase!(newslots, slots, basesize, start0) -> (nadded, seen_empty) + expand_parallel_basecase!(...) -> (nadded, seen_empty) Process all clusters started within `start0:(start0 + basesize)` (mod `length(slots)`). @@ -497,56 +382,75 @@ are mapped to non-overlapping regions when the `slots` array is doubled in size. More precisely: 1. Process all clusters started within `start0:(start0 + basesize - 1)`. -2. If more than one cluster is processed, process the cluster that contains the +2. If at least one cluster is processed, process the cluster that contains the start position of the next chunk `start0 + basesize` (mod `length(slots)`). + +Unlike the original trick mentioned in Maier et al. (2019), there is a +difference due to that `DictSlot` is reused in the `newslots`. Since the slot +is re-used, we can't use it as the reliable marker for delimiting the clusters. +Other tasks can empty a slot at any moment. So, instead, each basecase sets its +own promise `firstempties[ichunk]::Promise{Int}` to the index of the first empty +slot that it sees and successfully CAS'ed to `Moved{Empty}`. If no empty slot +is observed, the promise is set to -1. When the basecase reaches the end of +chunk (`start0 + basesize`), it confirms the end of the cluster it owns by +finding out the first valid `firstempties[mod1((ichunk+i),end)]` with the +smallest `i > 0`. If consecutive basecase tasks are started at the same time, by +the time a basecase task needs the promise, it is likely already is ready since +that's the first thing that the other task does. + +--- + +This trick is mentioned in: + +> Maier, Tobias, Peter Sanders, and Roman Dementiev. “Concurrent Hash Tables: +> Fast and General(?)!” ACM Transactions on Parallel Computing 5, no. 4 +> (February 22, 2019): 16:1–16:32. https://doi.org/10.1145/3309206. """ -function expand_parallel_basecase!( - newslots, - newpairnodes, - slots, - pairnodes, - basesize, - start0, -) - c = length(slots) ÷ 2 +function expand_parallel_basecase!(newslots, slots, basesize, start0, ichunk, firstempties) + c = length(slots) stop0 = min(start0 + basesize - 1, c) - stpd = migrate_serial!(nothing, nothing, slots, pairnodes, start0, stop0, Val(true)) + stpd = migrate_serial_nofill!(nothing, slots, start0, stop0, Val(true)) if stpd isa Int @assert stpd == 0 # This chunk does not own any clusters. + put!(firstempties[ichunk], -1) return (0, false) end - migrate_between(start, stop, flag) = - migrate_serial!(newslots, newpairnodes, slots, pairnodes, start, stop, flag) + migrate_between(start, stop) = + migrate_serial_nofill!(newslots, slots, start, stop, Val(false))::Int # An empty slot is observed. There is at least one cluster started within # this chunk. stpd::Stopped @assert stpd.nadded == 0 - nadded = migrate_between(stpd.i + 1, stop0, Val(false))::Int + put!(firstempties[ichunk], stpd.i) + nadded = migrate_between(stpd.i + 1, stop0) # Process the cluster that includes `start0 + basesize` (if any). - next_start = start0 + basesize - if next_start > c - next_start = 1 - end chunk_starts = ( - next_start:basesize:c, - 1:basesize:next_start-1, # wrap around + start0+basesize:basesize:c, + 1:basesize:start0-1, # wrap around + ) + chunk_indices = ( + ichunk+1:length(firstempties), + 1:ichunk-1, # wrap around ) - # Using `for half` so that the compiler does not unroll the loop. - # TODO: check if it is working - for half in 1:2, start in chunk_starts[half] - stop = min(start + basesize - 1, c) - stpd = migrate_between(start, stop, Val(true)) - if stpd isa Stopped - nadded += stpd.nadded - return (nadded, true) + for half in 1:2 + for (jchunk, start) in zip_strict(chunk_indices[half], chunk_starts[half]) + nextempty = fetch(firstempties[jchunk]) + if nextempty == -1 # next chunk is owned by this task + stop = min(start + basesize - 1, c) + nadded += migrate_between(start, stop) + else + nadded += migrate_between(start, nextempty - 1) + return (nadded, true) + end end - nadded += stpd::Int end - @static_error "unreachable: the empty slot disappeared?" + # Reaching here means there was only one empty slot. + nadded += migrate_between(start0, stpd.i - 1) + return (nadded, true) end plus_or((a, b), (c, d)) = (a + c, b | d) @@ -554,114 +458,93 @@ plus_or((a, b), (c, d)) = (a + c, b | d) # See`BenchDictMigration` for benchmarking this: const LINEAR_PROBING_DICT_EXPAND_BASESIZE = Ref(2^13) -function expand_parallel!(newslots, newpairnodes, slots, pairnodes, basesize) +function expand_parallel!(newslots, slots, basesize) # TODO: Make the default `basesize` configurable? basesize = something(basesize, LINEAR_PROBING_DICT_EXPAND_BASESIZE[]) @assert length(newslots) > length(slots) - length(slots) <= basesize && - return migrate_serial!(newslots, newpairnodes, slots, pairnodes) - basesize = min(basesize, cld(length(slots), 2 * Threads.nthreads())) - - c = length(slots) ÷ 2 - nadded, seen_empty = - threaded_typed_mapreduce(Tuple{Int,Bool}, plus_or, 1:basesize:c) do start0 - return expand_parallel_basecase!( - newslots, - newpairnodes, - slots, - pairnodes, - basesize, - start0, - ) - end + if length(slots) <= basesize || Threads.nthreads() == 1 + return migrate_serial!(newslots, slots) + end + basesize = min(basesize, cld(length(slots), Threads.nthreads())) + + # TODO: since `nadded` from this is not used any more, `|` can be used + # instead of `plus_or`. + c = length(slots) + chunkstarts = 1:basesize:c + firstempties = [Promise{Int}() for _ in chunkstarts] + nadded, seen_empty = threaded_typed_mapreduce( + Tuple{Int,Bool}, # return type of the do block and the doamin of `plus_or` + plus_or, + eachindex(chunkstarts), + ) do ichunk + start0 = chunkstarts[ichunk] + return expand_parallel_basecase!( + newslots, + slots, + basesize, + start0, + ichunk, + firstempties, + ) + end if seen_empty + # fill_undef!(newslots) # TODO: parallel? + Threads.@threads for i in eachindex(newslots) + @inbounds fill_undef_at!(newslots, i) + end return nadded else # The `slots` are all non-empty. Fallback to serial migration: - return migrate_serial!(newslots, newpairnodes, slots, pairnodes) + return migrate_serial!(newslots, slots) end end -migrate_serial!(newslots, newpairnodes, slots, pairnodes) = migrate_serial!( - newslots, - newpairnodes, - slots, - pairnodes, - 1, - length(slots) ÷ 2, - Val(false), -)::Int - -function migrate_serial!( - newslots, - newpairnodes, - slots, - pairnodes, - start, - stop, - stop_on_empty, -) - GC.@preserve newslots slots begin - nadded = unsafe_migrate!( - newslots, - newpairnodes, - slots, - pairnodes, - start, - stop, - stop_on_empty, - ) - end +function migrate_serial!(newslots, slots) + nadded = migrate_serial_nofill!(newslots, slots, 1, length(slots), Val(false)) + nadded = nadded::Int + fill_undef!(newslots) return nadded end -function unsafe_migrate!( - newslots::Union{AbstractVector{UInt64},Nothing}, - newpairnodes::Union{AbstractVector{R},Nothing}, - slots::AbstractVector{UInt64}, - pairnodes::AbstractVector{R}, +Base.@propagate_inbounds function fill_undef_at!(newslots, i) + if !isassigned(newslots, i) + newslots[i] = eltype(newslots)() + end +end + +function fill_undef!(newslots) + for i in eachindex(newslots) + @inbounds fill_undef_at!(newslots, i) + end + return newslots +end + +# TODO: Create "defrag" version that actually copies keys/values into pre-filled +# (continuously allocated) slots? +""" +Migrate `DictSlots` entries to `slots` to `newslots`. +""" +function migrate_serial_nofill!( + newslots::Union{AbstractVector{Slot},Nothing}, + slots::AbstractVector{Slot}, start::Int, stop::Int, stop_on_empty::Union{Val{false},Val{true}}, -) where {R<:AtomicRef{<:PairNode}} +) where {K,V,Slot<:DictSlot{K,V}} nadded = 0 for i in start:stop - offset = i - 1 - s1ptr = pointer(slots, 2 * offset + 1) - s2ptr = pointer(slots, 2 * offset + 2) - local tryset - let s2ptr = s2ptr - @inline function tryset(keyinfo, newstate) - local slot2 = UnsafeAtomics.load(s2ptr) - local oldslot = Pair(keyinfo.bits, slot2) - local newslot = Pair(setstate(keyinfo, newstate).bits, slot2) - local s12ptr = Ptr{typeof(oldslot)}(s1ptr) - return UnsafeAtomics.cas!(s12ptr, oldslot, newslot) === oldslot - end - end - local keyinfo + ref = slots[i] while true - keybits = UnsafeAtomics.load(s1ptr) - keyinfo = KeyInfo(keybits) - if keyinfo.isdeleted - break # next index - elseif keyinfo.ismovedempty + local value + value = @atomic ref.value + if value isa Empty + _, ok = @atomicreplace ref.value Empty() => MOVED_EMPTY + ok || continue stop_on_empty == Val(true) && return Stopped(i, nadded) break # next index - elseif keyinfo.isempty - # Mark that this slot is not usable anymore - if !tryset(keyinfo, LPD_MOVED_EMPTY) - continue - end - stop_on_empty == Val(true) && return Stopped(i, nadded) - break # next index - elseif keyinfo.haskey - if !tryset(keyinfo, LPD_MOVED) - continue - end - else - @assert keyinfo.ismoved + elseif value isa Moved{Empty} + @static_error("unexpected Moved{Empty}") end @goto move end @@ -670,34 +553,32 @@ function unsafe_migrate!( newslots === nothing && continue - newkeybits = setstate(keyinfo, LPD_HASKEY).bits - slot2 = UnsafeAtomics.load(s2ptr) - slotid = slotid_from_slot2(slot2) - node = load_pairnode(pairnodes, i, slotid) - h = reconstruct_full_hash(keyinfo, slot2) + h = @atomic :monotonic ref.hash + if iszero(h) + key = @atomic ref.key + key = key::K + h = hash(key) + end # Insertion to `newslots` does not have to use atomics since # it's protected by the `.migration` lock. - c = length(newslots) ÷ 2 + c = length(newslots) offset = reinterpret(Int, h) & (c - 1) # h % c nprobes = 0 while true - # TODO: non-atomic ordering - local keybits = @inbounds newslots[2*offset+1] - local keyinfo = KeyInfo(keybits) - if keyinfo.isempty - @inbounds newslots[2*offset+1] = newkeybits - @inbounds newslots[2*offset+2] = slot2 - ref = newpairnodes[offset+1] - @atomic ref.value = node + index = offset + 1 + offset = (offset + 1) & (c - 1) # (offset + 1) % c + + if !isassigned(newslots, index) + newslots[index] = ref nadded += 1 break end + nprobes += 1 if nprobes > c @static_error "unreachable: too many probings during migration" end - offset = (offset + 1) & (c - 1) # (offset + 1) % c end end return nadded @@ -708,30 +589,26 @@ Base.IteratorSize(::Type{<:Base.KeySet{<:Any,<:LinearProbingDict}}) = Base.SizeU Base.IteratorSize(::Type{<:Base.ValueIterator{<:LinearProbingDict}}) = Base.SizeUnknown() function Base.iterate(dict::LinearProbingDict) - slots, pairnodes = slots_and_pairnodes(dict) - return iterate(dict, (slots, pairnodes, 1)) + slots = load_slots(dict) + return iterate(dict, (slots, 1)) end -function Base.iterate(::LinearProbingDict, (slots, pairnodes, index)) - GC.@preserve slots begin - index < 1 && return nothing - while true - 2 * index > length(slots) && return nothing - offset = index - 1 - s1ptr = pointer(slots, 2 * offset + 1) - s2ptr = pointer(slots, 2 * offset + 2) - keybits = UnsafeAtomics.load(s1ptr) - keyinfo = KeyInfo(keybits) - if keyinfo.haskey | keyinfo.ismoved - slot2 = UnsafeAtomics.load(s2ptr) - slotid = slotid_from_slot2(slot2) - node = load_pairnode(pairnodes, index, slotid) - key = node.key - value = @atomic node.value - return (key => value), (slots, pairnodes, index + 1) +function Base.iterate(::LinearProbingDict, (slots, index)) + index < 1 && return nothing + while true + index > length(slots) && return nothing + ref = slots[index] + value = @atomic ref.value + if value isa Empty + elseif value isa Moved{Empty} + else + if value isa Moved + value = value.value end - index += 1 + key = @atomic ref.key + return (key => value), (slots, index + 1) end + index += 1 end end @@ -827,27 +704,29 @@ describe(map(length, cs)) ``` """ clusters(d::LinearProbingDict) = clusters(d.slots) -function clusters(slots::AbstractVector{UInt64}) +function clusters(slots::AbstractVector) cs = typeof(1:2)[] i = 1 while true while true - 2 * i > length(slots) && return cs - keyinfo = KeyInfo(slots[2*(i-1)+1]) + i > length(slots) && return cs + ref = slots[i] + key = @atomic ref.key i += 1 - if keyinfo.isempty | keyinfo.ismovedempty + if key isa Union{Empty,Moved{Empty}} break end end start = i - 1 while true - if 2 * i > length(slots) + if i > length(slots) push!(cs, start:i-1) return cs end - keyinfo = KeyInfo(slots[2*(i-1)+1]) + ref = slots[i] + key = @atomic ref.key i += 1 - if keyinfo.isempty | keyinfo.ismovedempty + if key isa Union{Empty,Moved{Empty}} break end end diff --git a/src/promise.jl b/src/promise.jl new file mode 100644 index 0000000..d76ed25 --- /dev/null +++ b/src/promise.jl @@ -0,0 +1,49 @@ +mutable struct Promise{T,S>:Union{Nothing,Some{T}}} + @atomic value::S + notify::Threads.Condition + # TODO: just use a stack of Tasks? + + function Promise{T}() where {T} + if sizeof(Some{Union{Nothing,Some{T}}}) > sizeof(UInt64) + return new{T,Any}(nothing, Threads.Condition()) + else + return new{T,Union{Nothing,Some{T}}}(nothing, Threads.Condition()) + end + end +end + +Promise() = Promise{Any}() + +function tryput!(p::Promise{T}, value) where {T} + new = Some{T}(value) + old, ok = @atomicreplace p.value nothing => new + if ok + lock(p.notify) do + notify(p.notify) + end + end + return old +end + +function Base.put!(p::Promise{T}, value) where {T} + if tryput!(p, value) !== nothing + error("Promise already has a value") + end + return p +end + +function Base.fetch(p::Promise{T}) where {T} + value = @atomic p.value + if value isa Some{T} + return something(value) + end + lock(p.notify) do + while true + local value = @atomic p.value + if value isa Some{T} + return something(value) + end + wait(p.notify) + end + end +end diff --git a/src/utils.jl b/src/utils.jl index da3afd2..6d7d9e2 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -15,6 +15,11 @@ macro static_error(msg::AbstractString) :(static_error(Val{$(QuoteNode(sym))}())) end +function noinline(f) + @noinline g() = f() + return g() +end + """ is_pointerfree_type(T::Type) :: Bool @@ -183,6 +188,14 @@ function threaded_typed_mapreduce(f, ::Type{T}, op, xs; kw...) where {T} return mapreduce(getindex, op, refs; kw...) end +function zip_strict(a, args...) + lens = map(length, args) + all(==(length(a)), lens) || noinline() do + error("collections have non-identical length: ", length(a), ", ", join(lens, ", ")) + end + return zip(a, args...) +end + function define_docstrings() docstrings = [:ConcurrentCollections => joinpath(dirname(@__DIR__), "README.md")] docsdir = joinpath(@__DIR__, "docs") diff --git a/test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl b/test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl index eb47854..21a3e16 100644 --- a/test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl +++ b/test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl @@ -9,15 +9,25 @@ function test() datasize_list = [10, 2^5, 2^10, 2^20] fulldata = generate(datasize = datasize_list[end]) @testset for datasize in datasize_list - test(datasize, fulldata) + data = view(fulldata, 1:datasize) + test(data) end end -function test(datasize, fulldata) - data = view(fulldata, 1:datasize) +function test(data) dbase = hist_seq!(Dict{String,Int}(), data) @testset "seq" begin cdseq = hist_seq!(ConcurrentDict{String,Int}(), data) + @test sort(collect(setdiff(keys(dbase), keys(cdseq)))) == [] + @test sort(collect(setdiff(keys(cdseq), keys(dbase)))) == [] + diffvalues = [] + for (key, expected) in dbase + actual = cdseq[key] + if actual != expected + push!(diffvalues, (; key, actual, expected)) + end + end + @test diffvalues == [] @test Dict(cdseq) == dbase end @testset for ntasks in default_ntasks_list() diff --git a/test/ConcurrentCollectionsTests/src/test_dict.jl b/test/ConcurrentCollectionsTests/src/test_dict.jl index 431793e..360e2c3 100644 --- a/test/ConcurrentCollectionsTests/src/test_dict.jl +++ b/test/ConcurrentCollectionsTests/src/test_dict.jl @@ -1,54 +1,9 @@ module TestDict using ConcurrentCollections -using ConcurrentCollections.Implementations: - LPDKeyState, - LPD_BITMASK, - LPD_DELETED, - LPD_EMPTY, - LPD_HASKEY, - LPD_MOVED, - LPD_MOVED_EMPTY, - LPD_NBITS, - KeyInfo, - clusters, - migrate!, - setdata, - setstate +using ConcurrentCollections.Implementations: clusters, migrate! using Test -function test_keyinfo() - @test KeyInfo(UInt64(0)).state === LPD_EMPTY - @testset for state in instances(LPDKeyState) - @test KeyInfo{UInt64}(state, 0x0123456789abcdef).state === state - if state !== LPD_EMPTY - @test KeyInfo{UInt64}(state, 0x0123456789abcdef).keydata === 0x0123456789abcdef - end - @test setstate(KeyInfo(rand(UInt64)), state).state === state - keydata = rand(UInt64) >> LPD_NBITS - @test setdata(KeyInfo(rand(UInt64)), keydata).keydata === keydata - end -end - -function test_keyinfo_properties() - keyinfo = KeyInfo{UInt64}(rand(UInt64)) - enum_to_property = Dict( - LPD_EMPTY => :isempty, - LPD_DELETED => :isdeleted, - LPD_MOVED_EMPTY => :ismovedempty, - LPD_MOVED => :ismoved, - LPD_HASKEY => :haskey, - ) - properties = collect(values(enum_to_property)) - @testset for state in instances(LPDKeyState), prop in properties - if enum_to_property[state] === prop - @test getproperty(setstate(keyinfo, state), prop) - else - @test !getproperty(setstate(keyinfo, state), prop) - end - end -end - function test_expand_and_shrink(n = 17) d = ConcurrentDict{Int,Int}() @testset "expand" begin