Skip to content

Commit

Permalink
replace ConcurrentDictionary with SortedList and use a SpinLock to sy…
Browse files Browse the repository at this point in the history
…nchronize ( ported from https://github.com/danielcrenna/metrics-net/pull/36 by Antoine Souques

@razrin )
  • Loading branch information
etishor committed Sep 24, 2014
1 parent 9dc6b65 commit 1caa9d0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
80 changes: 50 additions & 30 deletions Src/Metrics/Core/ExponentiallyDecayingReservoir.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Metrics.Utils;

Expand All @@ -13,8 +11,20 @@ public sealed class ExponentiallyDecayingReservoir : Reservoir, IDisposable
private const double DefaultAlpha = 0.015;
private static readonly TimeSpan RescaleInterval = TimeSpan.FromHours(1);

private readonly ConcurrentDictionary<double, WeightedSample> values = new ConcurrentDictionary<double, WeightedSample>();
private readonly ReaderWriterLockSlim @lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private class ReverseOrderDoubleComparer : IComparer<double>
{
public static readonly IComparer<double> Instance = new ReverseOrderDoubleComparer();

public int Compare(double x, double y)
{
return y.CompareTo(x);
}
}

private readonly SortedList<double, WeightedSample> values;

private SpinLock @lock = new SpinLock();

private readonly double alpha;
private readonly int size;
private AtomicLong count = new AtomicLong();
Expand Down Expand Up @@ -42,6 +52,8 @@ public ExponentiallyDecayingReservoir(int size, double alpha, Clock clock, Sched
this.alpha = alpha;
this.clock = clock;

this.values = new SortedList<double, WeightedSample>(size, ReverseOrderDoubleComparer.Instance);

this.rescaleScheduler = scheduler;
this.rescaleScheduler.Start(RescaleInterval, () => Rescale());

Expand All @@ -54,14 +66,18 @@ public Snapshot Snapshot
{
get
{
this.@lock.EnterReadLock();
bool lockTaken = false;
try
{
this.@lock.Enter(ref lockTaken);
return new WeightedSnapshot(this.values.Values);
}
finally
{
this.@lock.ExitReadLock();
if (lockTaken)
{
this.@lock.Exit();
}
}
}
}
Expand All @@ -73,58 +89,60 @@ public void Update(long value)

public void Reset()
{
this.@lock.EnterWriteLock();
bool lockTaken = false;
try
{
this.@lock.Enter(ref lockTaken);
this.values.Clear();
this.count.SetValue(0L);
this.startTime = new AtomicLong(this.clock.Seconds);
}
finally
{
this.@lock.ExitWriteLock();
if (lockTaken)
{
this.@lock.Exit();
}
}
}

private void Update(long value, long timestamp)
{
this.@lock.EnterReadLock();
bool lockTaken = false;
try
{
this.@lock.Enter(ref lockTaken);
double itemWeight = Math.Exp(alpha * (timestamp - startTime.Value));
var sample = new WeightedSample(value, itemWeight);
double priority = itemWeight / ThreadLocalRandom.NextDouble();

long newCount = count.Increment();
if (newCount <= size)
{
this.values.AddOrUpdate(priority, sample, (k, v) => sample);
this.values[priority] = sample;
}
else
{
var first = values.First().Key;
var first = this.values.Keys[values.Count - 1];
if (first < priority)
{
this.values.AddOrUpdate(priority, sample, (k, v) => v);

WeightedSample removed;
// ensure we always remove an item
while (!values.TryRemove(first, out removed))
{
first = values.First().Key;
}
this.values.Remove(first);
this.values[priority] = sample;
}
}
}
finally
{
this.@lock.ExitReadLock();
if (lockTaken)
{
this.@lock.Exit();
}
}
}

public void Dispose()
{
using (this.@lock) { }
using (this.rescaleScheduler) { }
}

///* "A common feature of the above techniques—indeed, the key technique that
Expand All @@ -147,9 +165,10 @@ public void Dispose()
// */
private void Rescale()
{
this.@lock.EnterWriteLock();
bool lockTaken = false;
try
{
this.@lock.Enter(ref lockTaken);
long oldStartTime = startTime.Value;
this.startTime.SetValue(this.clock.Seconds);

Expand All @@ -158,20 +177,21 @@ private void Rescale()
var keys = new List<double>(this.values.Keys);
foreach (var key in keys)
{
WeightedSample sample;
if (this.values.TryRemove(key, out sample))
{
double newKey = key * Math.Exp(-alpha * (startTime.Value - oldStartTime));
var newSample = new WeightedSample(sample.Value, sample.Weight * scalingFactor);
values.AddOrUpdate(newKey, newSample, (k, v) => newSample);
}
WeightedSample sample = this.values[key];
this.values.Remove(key);
double newKey = key * Math.Exp(-alpha * (startTime.Value - oldStartTime));
var newSample = new WeightedSample(sample.Value, sample.Weight * scalingFactor);
this.values[newKey] = newSample;
}
// make sure the counter is in sync with the number of stored samples.
this.count.SetValue(values.Count);
}
finally
{
this.@lock.ExitWriteLock();
if (lockTaken)
{
this.@lock.Exit();
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions Src/Metrics/Core/WeightedSnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Linq;
namespace Metrics.Core
{
public struct WeightedSample
public class WeightedSample
{
public readonly long Value;
public readonly double Weight;
Expand All @@ -24,7 +24,7 @@ public struct WeightedSnapshot : Snapshot

private class WeightedSampleComparer : IComparer<WeightedSample>
{
public static IComparer<WeightedSample> Instance = new WeightedSampleComparer();
public static readonly IComparer<WeightedSample> Instance = new WeightedSampleComparer();

public int Compare(WeightedSample x, WeightedSample y)
{
Expand Down

0 comments on commit 1caa9d0

Please sign in to comment.