Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 35 additions & 66 deletions BitFaster.Caching/Lru/ConcurrentLruCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public bool TryRemove(KeyValuePair<K, V> item)
if (((ICollection<KeyValuePair<K, I>>)this.dictionary).Remove(kvp))
#endif
{
OnRemove(item.Key, kvp.Value);
OnRemove(item.Key, kvp.Value, ItemRemovedReason.Removed);
return true;
}
}
Expand All @@ -333,7 +333,7 @@ public bool TryRemove(K key, out V value)
{
if (this.dictionary.TryRemove(key, out var item))
{
OnRemove(key, item);
OnRemove(key, item, ItemRemovedReason.Removed);
value = item.Value;
return true;
}
Expand All @@ -348,15 +348,16 @@ public bool TryRemove(K key)
return TryRemove(key, out _);
}

private void OnRemove(K key, I item)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnRemove(K key, I item, ItemRemovedReason reason)
{
// Mark as not accessed, it will later be cycled out of the queues because it can never be fetched
// from the dictionary. Note: Hot/Warm/Cold count will reflect the removed item until it is cycled
// from the queue.
item.WasAccessed = false;
item.WasRemoved = true;

this.telemetryPolicy.OnItemRemoved(key, item.Value, ItemRemovedReason.Removed);
this.telemetryPolicy.OnItemRemoved(key, item.Value, reason);

// serialize dispose (common case dispose not thread safe)
lock (item)
Expand All @@ -365,7 +366,6 @@ private void OnRemove(K key, I item)
}
}


///<inheritdoc/>
///<remarks>Note: Calling this method does not affect LRU order.</remarks>
public bool TryUpdate(K key, V value)
Expand Down Expand Up @@ -546,10 +546,8 @@ private void Cycle(int hotCount)
{
(var dest, var count) = CycleHot(hotCount);

const int maxAttempts = 5;
int attempts = 0;

while (attempts++ < maxAttempts)
int cycles = 0;
while (cycles++ < 3 && dest != ItemDestination.Remove)
{
if (dest == ItemDestination.Warm)
{
Expand All @@ -559,41 +557,17 @@ private void Cycle(int hotCount)
{
(dest, count) = CycleCold(count);
}
else
{
// If an item was removed, it is possible that the warm and cold queues are still oversize.
// Attempt to recover. It is possible that multiple threads read the same queue count here,
// so this process has races that could reduce cache size below capacity. This manifests
// in 'off by one' which is considered harmless.

(dest, count) = CycleCold(Volatile.Read(ref counter.cold));
if (dest != ItemDestination.Remove)
{
continue;
}

(dest, count) = CycleWarm(Volatile.Read(ref counter.warm));
if (dest != ItemDestination.Remove)
{
continue;
}

break;
}
}

// If we get here, we have cycled the queues multiple times and still have not removed an item.
// This can happen if the cache is full of items that are not discardable. In this case, we simply
// discard the coldest item to avoid unbounded growth.

// If nothing was removed yet, constrain the size of warm and cold by discarding the coldest item.
if (dest != ItemDestination.Remove)
{
// if an item was last moved into warm, move the last warm item to cold to prevent enlarging warm
if (dest == ItemDestination.Warm)
if (dest == ItemDestination.Warm && count > this.capacity.Warm)
{
LastWarmToCold();
count = LastWarmToCold();
}

RemoveCold(ItemRemovedReason.Evicted);
ConstrainCold(count, ItemRemovedReason.Evicted);
}
}
else
Expand All @@ -604,20 +578,6 @@ private void Cycle(int hotCount)
}
}

private void LastWarmToCold()
{
Interlocked.Decrement(ref this.counter.warm);

if (this.warmQueue.TryDequeue(out var item))
{
this.Move(item, ItemDestination.Cold, ItemRemovedReason.Evicted);
}
else
{
Interlocked.Increment(ref this.counter.warm);
}
}

private void CycleDuringWarmup(int hotCount)
{
// do nothing until hot is full
Expand Down Expand Up @@ -750,17 +710,29 @@ private void CycleDuringWarmup(int hotCount)
}
}

private void RemoveCold(ItemRemovedReason removedReason)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int LastWarmToCold()
{
Interlocked.Decrement(ref this.counter.cold);
Interlocked.Decrement(ref this.counter.warm);

if (this.coldQueue.TryDequeue(out var item))
if (this.warmQueue.TryDequeue(out var item))
{
this.Move(item, ItemDestination.Remove, removedReason);
return this.Move(item, ItemDestination.Cold, ItemRemovedReason.Evicted);
}
else
{
Interlocked.Increment(ref this.counter.cold);
Interlocked.Increment(ref this.counter.warm);
return 0;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ConstrainCold(int coldCount, ItemRemovedReason removedReason)
{
if (coldCount > this.capacity.Cold && this.coldQueue.TryDequeue(out var item))
{
Interlocked.Decrement(ref this.counter.cold);
this.Move(item, ItemDestination.Remove, removedReason);
}
}

Expand All @@ -781,18 +753,14 @@ private int Move(I item, ItemDestination where, ItemRemovedReason removedReason)

var kvp = new KeyValuePair<K, I>(item.Key, item);

// hidden atomic remove
#if NET6_0_OR_GREATER
if (this.dictionary.TryRemove(kvp))
#else
// https://devblogs.microsoft.com/pfxteam/little-known-gems-atomic-conditional-removals-from-concurrentdictionary/
if (((ICollection<KeyValuePair<K, I>>)this.dictionary).Remove(kvp))
#endif
{
item.WasRemoved = true;

this.telemetryPolicy.OnItemRemoved(item.Key, item.Value, removedReason);

lock (item)
{
Disposer<V>.Dispose(item.Value);
}
OnRemove(item.Key, item, removedReason);
}
break;
}
Expand Down Expand Up @@ -826,6 +794,7 @@ private static CachePolicy CreatePolicy(ConcurrentLruCore<K, V, I, P, T> lru)
// it becomes immutable. However, this object is then somewhere else on the
// heap, which slows down the policies with hit counter logic in benchmarks. Likely
// this approach keeps the structs data members in the same CPU cache line as the LRU.
// backcompat: remove conditional compile
#if NETCOREAPP3_0_OR_GREATER
[DebuggerDisplay("Hit = {Hits}, Miss = {Misses}, Upd = {Updated}, Evict = {Evicted}")]
#else
Expand Down