Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions src/dict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ end
@inline function cleanup_pairnode!(slots, pairnodes, index)
GC.@preserve slots begin
s2ptr = pointer(slots, 2 * index)
slotid = UnsafeAtomics.load(s2ptr)
slot2 = UnsafeAtomics.load(s2ptr)
slotid = slotid_from_slot2(slot2)
end
iszero(slotid) && unreachable()
ref = pairnodes[index]
Expand Down Expand Up @@ -253,6 +254,13 @@ function ConcurrentCollections.trypop!(d::LinearProbingDict, key)
end
end

@inline slotid_from_slot2(slot2::UInt64) = slot2 & (typemax(UInt64) >> LPD_NBITS)

@inline function reconstruct_full_hash(keyinfo::KeyInfo, slot2::UInt64)
hash2 = slot2 & ~(typemax(UInt64) >> LPD_NBITS)
return (keyinfo.keydata << LPD_NBITS) | (hash2 >> (64 - LPD_NBITS))
end

function ConcurrentCollections.modify!(
f,
dict::LinearProbingDict{Key,Value},
Expand All @@ -264,8 +272,9 @@ function ConcurrentCollections.modify!(

h = hash(key)

# The upper bits of hash that would be stored in `keyinfo.keydata`:
inlinedhash = h >> LPD_NBITS
# The upper and lower bits of hash:
hash1 = h >> LPD_NBITS # stored in slot 1 (`keyinfo.keydata`)
hash2 = h << (64 - LPD_NBITS) # stored in slot 2

if 4 * length_upper_bound(dict) > length(slots)
slots, pairnodes = expand!(dict, slots, pairnodes)
Expand Down Expand Up @@ -295,8 +304,9 @@ function ConcurrentCollections.modify!(
# TODO: Handle wrap-around of slotid? Reset it during migration?
end
prepare_pairnode!(pairnodes, index, newslotid, key, something(reply))
slot2 = hash2 | newslotid
oldslot = Pair(keybits, zero(keybits))
newslot = Pair(KeyInfo{UInt64}(LPD_HASKEY, inlinedhash).bits, newslotid)
newslot = Pair(KeyInfo{UInt64}(LPD_HASKEY, hash1).bits, slot2)
s12ptr = Ptr{typeof(oldslot)}(s1ptr)
found = UnsafeAtomics.cas!(s12ptr, oldslot, newslot)
if found === oldslot
Expand All @@ -321,10 +331,12 @@ function ConcurrentCollections.modify!(
# no need to retry.
end
elseif keyinfo.haskey
if keyinfo.keydata ==′ inlinedhash
slotid = UnsafeAtomics.load(s2ptr)
if keyinfo.keydata ==′ hash1
slot2 = UnsafeAtomics.load(s2ptr)
slotid = slotid_from_slot2(slot2)
stored_hash = reconstruct_full_hash(keyinfo, slot2)
node = load_pairnode(pairnodes, index, slotid)
if isequal(node.key, key)
if stored_hash == h && isequal(node.key, key)
vref = ValueRef{Value}(node)
while true
reply = f(vref)::Union{Keep,Nothing,Delete,Some}
Expand All @@ -339,8 +351,8 @@ function ConcurrentCollections.modify!(
end

# Deletion:
oldslot = Pair(keybits, slotid)
newslot = Pair(setstate(keyinfo, LPD_DELETED).bits, slotid)
oldslot = Pair(keybits, slot2)
newslot = Pair(setstate(keyinfo, LPD_DELETED).bits, slot2)
s12ptr = Ptr{typeof(oldslot)}(s1ptr)
if UnsafeAtomics.cas!(s12ptr, oldslot, newslot) === oldslot
ndeleted = Threads.atomic_add!(dict.ndeleted, 1) + 1
Expand Down Expand Up @@ -621,9 +633,9 @@ function unsafe_migrate!(
local tryset
let s2ptr = s2ptr
@inline function tryset(keyinfo, newstate)
local slotid = UnsafeAtomics.load(s2ptr)
local oldslot = Pair(keyinfo.bits, slotid)
local newslot = Pair(setstate(keyinfo, newstate).bits, slotid)
local slot2 = UnsafeAtomics.load(s2ptr)
local oldslot = Pair(keyinfo.bits, slot2)
local newslot = Pair(setstate(keyinfo, newstate).bits, slot2)
local s12ptr = Ptr{typeof(oldslot)}(s1ptr)
return UnsafeAtomics.cas!(s12ptr, oldslot, newslot) === oldslot
end
Expand Down Expand Up @@ -659,23 +671,23 @@ function unsafe_migrate!(
newslots === nothing && continue

newkeybits = setstate(keyinfo, LPD_HASKEY).bits
slotid = UnsafeAtomics.load(s2ptr)
slot2 = UnsafeAtomics.load(s2ptr)
slotid = slotid_from_slot2(slot2)
node = load_pairnode(pairnodes, i, slotid)
key = node.key
h = reconstruct_full_hash(keyinfo, slot2)

# Insertion to `newslots` does not have to use atomics since
# it's protected by the `.migration` lock.
c = length(newslots) ÷ 2
h = reinterpret(Int, hash(key))
offset = h & (c - 1) # h % c
offset = reinterpret(Int, h) & (c - 1) # h % c
nprobes = 0
while true
# TODO: non-atomic ordering
local keybits = @inbounds newslots[2*offset+1]
local keyinfo = KeyInfo(keybits)
if keyinfo.isempty
@inbounds newslots[2*offset+1] = newkeybits
@inbounds newslots[2*offset+2] = slotid
@inbounds newslots[2*offset+2] = slot2
ref = newpairnodes[offset+1]
@atomic ref.value = node
nadded += 1
Expand Down Expand Up @@ -711,7 +723,8 @@ function Base.iterate(::LinearProbingDict, (slots, pairnodes, index))
keybits = UnsafeAtomics.load(s1ptr)
keyinfo = KeyInfo(keybits)
if keyinfo.haskey | keyinfo.ismoved
slotid = UnsafeAtomics.load(s2ptr)
slot2 = UnsafeAtomics.load(s2ptr)
slotid = slotid_from_slot2(slot2)
node = load_pairnode(pairnodes, index, slotid)
key = node.key
value = @atomic node.value
Expand Down