Permalink
Browse files

Added padded classes to the Atomic project and ported the disruptor c…

…ode to use the new constructs
  • Loading branch information...
1 parent f9300fe commit f9763cf1200f9bffbc78b1e8fa95a4ec42df1d33 @odeheurles odeheurles committed Dec 31, 2011
@@ -130,6 +130,10 @@
<Compile Include="UniCast1P1C\UniCast1P1CDisruptorPerfTest.cs" />
</ItemGroup>
<ItemGroup>
+ <ProjectReference Include="..\Atomic\Atomic.csproj">
+ <Project>{BDB4336E-54F2-475D-8193-B730C522261F}</Project>
+ <Name>Atomic</Name>
+ </ProjectReference>
<ProjectReference Include="..\Disruptor.Scheduler\Disruptor.Scheduler.csproj">
<Project>{7B1F3DC3-5DA8-4E36-A03B-837D90D2A51D}</Project>
<Name>Disruptor.Scheduler</Name>
@@ -1,5 +1,4 @@
using System.Threading;
-using Disruptor.MemoryLayout;
namespace Disruptor.PerfTests.Support
{
@@ -8,19 +7,19 @@ public class FizzBuzzEventHandler : IEventHandler<FizzBuzzEvent>
private readonly FizzBuzzStep _fizzBuzzStep;
private readonly long _iterations;
private readonly ManualResetEvent _mru;
- private PaddedLong _fizzBuzzCounter;
+ private Volatile.PaddedLong _fizzBuzzCounter;
public long FizzBuzzCounter
{
- get { return _fizzBuzzCounter.Value; }
+ get { return _fizzBuzzCounter.ReadUnfenced(); }
}
public FizzBuzzEventHandler(FizzBuzzStep fizzBuzzStep, long iterations, ManualResetEvent mru)
{
_fizzBuzzStep = fizzBuzzStep;
_iterations = iterations;
_mru = mru;
- _fizzBuzzCounter = new PaddedLong(0);
+ _fizzBuzzCounter = new Volatile.PaddedLong(0);
}
public void OnNext(FizzBuzzEvent data, long sequence, bool endOfBatch)
@@ -37,7 +36,7 @@ public void OnNext(FizzBuzzEvent data, long sequence, bool endOfBatch)
case FizzBuzzStep.FizzBuzz:
if (data.Fizz && data.Buzz)
{
- ++_fizzBuzzCounter.Value;
+ _fizzBuzzCounter.WriteUnfenced(_fizzBuzzCounter.ReadUnfenced() + 1);
}
break;
}
@@ -1,18 +1,17 @@
using System.Threading;
-using Disruptor.MemoryLayout;
namespace Disruptor.PerfTests.Support
{
public class FunctionEventHandler : IEventHandler<FunctionEvent>
{
private readonly FunctionStep _functionStep;
- private PaddedLong _stepThreeCounter;
+ private Volatile.PaddedLong _stepThreeCounter = default(Volatile.PaddedLong);
private readonly long _iterations;
private readonly ManualResetEvent _mru;
public long StepThreeCounter
{
- get { return _stepThreeCounter.Value; }
+ get { return _stepThreeCounter.ReadUnfenced(); }
}
public FunctionEventHandler(FunctionStep functionStep, long iterations, ManualResetEvent mru)
@@ -36,7 +35,7 @@ public void OnNext(FunctionEvent data, long sequence, bool endOfBatch)
case FunctionStep.Three:
if ((data.StepTwoResult & 4L) == 4L)
{
- _stepThreeCounter.Value++;
+ _stepThreeCounter.WriteUnfenced(_stepThreeCounter.ReadUnfenced() + 1);
}
break;
}
@@ -1,12 +1,11 @@
using System.Threading;
-using Disruptor.MemoryLayout;
namespace Disruptor.PerfTests.Support
{
public class ValueAdditionEventHandler : IEventHandler<ValueEvent>
{
private readonly long _iterations;
- private PaddedLong _value;
+ private Volatile.PaddedLong _value;
private readonly ManualResetEvent _mru;
public ValueAdditionEventHandler(long iterations, ManualResetEvent mru)
@@ -17,12 +16,12 @@ public ValueAdditionEventHandler(long iterations, ManualResetEvent mru)
public long Value
{
- get { return _value.Value; }
+ get { return _value.ReadUnfenced(); }
}
public void OnNext(ValueEvent value, long sequence, bool endOfBatch)
{
- _value.Value += value.Value;
+ _value.WriteUnfenced(_value.ReadUnfenced() + value.Value);
if(sequence == _iterations - 1)
{
@@ -1,12 +1,11 @@
using System.Threading;
-using Disruptor.MemoryLayout;
namespace Disruptor.PerfTests.Support
{
public class ValueMutationEventHandler : IEventHandler<ValueEvent>
{
private readonly Operation _operation;
- private PaddedLong _value;
+ private Volatile.PaddedLong _value = new Volatile.PaddedLong(0);
private readonly long _iterations;
private readonly CountdownEvent _latch;
@@ -19,12 +18,12 @@ public ValueMutationEventHandler(Operation operation, long iterations, Countdown
public long Value
{
- get { return _value.Value; }
+ get { return _value.ReadUnfenced(); }
}
public void OnNext(ValueEvent data, long sequence, bool endOfBatch)
{
- _value.Value = _operation.Op(_value.Value, data.Value);
+ _value.WriteUnfenced(_operation.Op(_value.ReadUnfenced(), data.Value));
if (sequence == _iterations - 1)
{
@@ -65,14 +65,11 @@
<Compile Include="ProcessingSequenceBarrier.cs" />
<Compile Include="BatchEventProcessor.cs" />
<Compile Include="Collections\Histogram.cs" />
- <Compile Include="MemoryLayout\PaddedAtomicLong.cs" />
- <Compile Include="MemoryLayout\PaddedLong.cs" />
<Compile Include="Dsl\EventHandlerGroup.cs" />
<Compile Include="Dsl\EventProcessorInfo.cs" />
<Compile Include="Dsl\EventProcessorRepository.cs" />
<Compile Include="IEventHandlerGroup.cs" />
<Compile Include="ILifecycleAware.cs" />
- <Compile Include="MemoryLayout\CacheLine.cs" />
<Compile Include="IEventHandler.cs" />
<Compile Include="IClaimStrategy.cs" />
<Compile Include="IEventProcessor.cs" />
@@ -1,13 +0,0 @@
-namespace Disruptor.MemoryLayout
-{
- ///<summary>
- /// Constant definition of cache line size
- ///</summary>
- internal static class CacheLine
- {
- ///<summary>
- /// Size of a cache line in bytes
- ///</summary>
- public const int Size = 64;
- }
-}
@@ -1,67 +0,0 @@
-using System.Runtime.InteropServices;
-using System.Threading;
-
-namespace Disruptor.MemoryLayout
-{
- /// <summary>
- /// A <see cref="long"/> wrapped in PaddedLong is guaranteed to live on its own cache line
- /// </summary>
- [StructLayout(LayoutKind.Explicit, Size = 2 * CacheLine.Size)]
- internal struct PaddedAtomicLong
- {
- [FieldOffset(CacheLine.Size)]
- private long _value;
-
- ///<summary>
- /// Initialise a new instance of CacheLineStorage
- ///</summary>
- ///<param name="value">default value</param>
- public PaddedAtomicLong(long value)
- {
- _value = value;
- }
-
- ///<summary>
- /// Increments a specified variable and stores the result, as an atomic operation.
- ///</summary>
- ///<returns>incremented result</returns>
- public long IncrementAndGet()
- {
- return Interlocked.Increment(ref _value);
- }
-
- /// <summary>
- /// Increments a specified variable and stores the result, as an atomic operation.
- /// </summary>
- /// <param name="delta"></param>
- /// <returns></returns>
- public long IncrementAndGet(int delta)
- {
- return Interlocked.Add(ref _value, delta);
- }
-
- /// <summary>
- /// Expose data with full fence on read and write
- /// </summary>
- public long Value
- {
- get { return Thread.VolatileRead(ref _value); }
- set { Thread.VolatileWrite(ref _value, value); }
- }
-
- public void LazySet(long value)
- {
- _value = value;
- }
-
- public bool CompareAndSet(long comparand, long value)
- {
- return Interlocked.CompareExchange(ref _value, value, comparand) == comparand;
- }
-
- public long AddAndGet(int delta)
- {
- return Interlocked.Add(ref _value, delta);
- }
- }
-}
@@ -1,32 +0,0 @@
-using System.Runtime.InteropServices;
-
-namespace Disruptor.MemoryLayout
-{
- /// <summary>
- /// A <see cref="long"/> wrapped in PaddedLong is guaranteed to live on its own cache line
- /// </summary>
- [StructLayout(LayoutKind.Explicit, Size = 2 * CacheLine.Size)]
- public struct PaddedLong
- {
- [FieldOffset(CacheLine.Size)]
- private long _value;
-
- ///<summary>
- /// Initialise a new instance of CacheLineStorage
- ///</summary>
- ///<param name="value">default value of Value</param>
- public PaddedLong(long value)
- {
- _value = value;
- }
-
- /// <summary>
- /// Expose Value with full fence on read and write
- /// </summary>
- public long Value
- {
- get { return _value; }
- set { _value = value; }
- }
- }
-}
@@ -1,5 +1,4 @@
using System.Threading;
-using Disruptor.MemoryLayout;
namespace Disruptor
{
@@ -14,7 +13,7 @@ namespace Disruptor
public class MultiThreadedClaimStrategy : IClaimStrategy
{
private readonly int _bufferSize;
- private PaddedAtomicLong _claimSequence = new PaddedAtomicLong(Sequencer.InitialCursorValue);
+ private Volatile.PaddedLong _claimSequence = new Volatile.PaddedLong(Sequencer.InitialCursorValue);
private readonly Volatile.LongArray _pendingPublication;
private readonly int _pendingMask;
private readonly ThreadLocal<MutableLong> _minGatingSequenceThreadLocal = new ThreadLocal<MutableLong>(() => new MutableLong(Sequencer.InitialCursorValue));
@@ -44,7 +43,7 @@ public int BufferSize
/// </summary>
public long Sequence
{
- get { return _claimSequence.Value; }
+ get { return _claimSequence.ReadFullFence(); }
}
/// <summary>
@@ -55,7 +54,7 @@ public long Sequence
/// <returns>true if the buffer has capacity for the requested sequence.</returns>
public bool HasAvailableCapacity(int availableCapacity, Sequence[] dependentSequences)
{
- long wrapPoint = (_claimSequence.Value + availableCapacity) - _bufferSize;
+ long wrapPoint = (_claimSequence.ReadFullFence() + availableCapacity) - _bufferSize;
MutableLong minGatingSequence = _minGatingSequenceThreadLocal.Value;
if (wrapPoint > minGatingSequence.Value)
{
@@ -82,7 +81,7 @@ public long IncrementAndGet(Sequence[] dependentSequences)
MutableLong minGatingSequence = _minGatingSequenceThreadLocal.Value;
WaitForCapacity(dependentSequences, minGatingSequence);
- long nextSequence = _claimSequence.IncrementAndGet();
+ long nextSequence = _claimSequence.AtomicIncrementAndGet();
WaitForFreeSlotAt(nextSequence, dependentSequences, minGatingSequence);
return nextSequence;
@@ -97,7 +96,7 @@ public long IncrementAndGet(Sequence[] dependentSequences)
///<returns>the result after incrementing.</returns>
public long IncrementAndGet(int delta, Sequence[] dependentSequences)
{
- long nextSequence = _claimSequence.AddAndGet(delta);
+ long nextSequence = _claimSequence.AtomicAddAndGet(delta);
WaitForFreeSlotAt(nextSequence, dependentSequences, _minGatingSequenceThreadLocal.Value);
return nextSequence;
@@ -111,7 +110,7 @@ public long IncrementAndGet(int delta, Sequence[] dependentSequences)
/// <param name="dependentSequences">dependentSequences to be checked for range.</param>
public void SetSequence(long sequence, Sequence[] dependentSequences)
{
- _claimSequence.LazySet(sequence);
+ _claimSequence.WriteCompilerOnlyFence(sequence);
WaitForFreeSlotAt(sequence, dependentSequences, _minGatingSequenceThreadLocal.Value);
}
@@ -154,7 +153,7 @@ public void SerialisePublishing(long sequence, Sequence cursor, long batchSize)
private void WaitForCapacity(Sequence[] dependentSequences, MutableLong minGatingSequence)
{
- long wrapPoint = (_claimSequence.Value + 1L) - _bufferSize;
+ long wrapPoint = (_claimSequence.ReadFullFence() + 1L) - _bufferSize;
if (wrapPoint > minGatingSequence.Value)
{
var spinWait = default(SpinWait);
Oops, something went wrong.

0 comments on commit f9763cf

Please sign in to comment.