Skip to content

Commit

Permalink
Ported r497: MultiThreadedClaimStrategy renamed MultiThreadedLowConte…
Browse files Browse the repository at this point in the history
…ntionStrategy and added the new MultiThreadedClaimStrategy

The new MultiThreadedClaimStrategy is marked Obselete for the time beeing: I have seen it deadlock and need to find a fix
  • Loading branch information
Olivier Deheurles committed Dec 23, 2011
1 parent d81136b commit 1d6d87c
Show file tree
Hide file tree
Showing 13 changed files with 570 additions and 50 deletions.
5 changes: 3 additions & 2 deletions Backlog.txt
Expand Up @@ -2,10 +2,11 @@
- generate API documentation in the build process
- add proper documentation in the wiki

- port experimentals
- port OnePublisherToOneProcessorUniCastRawThroughputTest
- port OnePublisherToThreeWorkerPoolThroughputTest
- find a more flexible way to start threads (Disruptor.Start and WorkerPool.Start)
- find an equivalent for LockSupport.parkNanos(1L); (Search TODOs)
- port the DSL Tests
- port changes from r495
- pull request from TimGebhardt
- create new assembly for Atomics/Padded
- Processor affinity thread pool
Expand Up @@ -20,7 +20,7 @@ public Sequencer3P1CDisruptorPerfTest()
: base(20 * Million)
{
_disruptor = new Disruptor<ValueEvent>(()=>new ValueEvent(),
new MultiThreadedClaimStrategy(Size),
new MultiThreadedLowContentionClaimStrategy(Size),
new YieldingWaitStrategy());
_mru = new ManualResetEvent(false);
_eventHandler = new ValueAdditionEventHandler(Iterations * NumProducers, _mru);
Expand Down
3 changes: 2 additions & 1 deletion Disruptor.Tests/Disruptor.Tests.csproj
Expand Up @@ -58,6 +58,7 @@
<Compile Include="BatchEventProcessorTests.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="MultiThreadedClaimStrategyTests.cs" />
<Compile Include="SequenceBarrierTests.cs">
<SubType>Code</SubType>
</Compile>
Expand All @@ -67,7 +68,7 @@
<Compile Include="LifecycleAwareTests.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="MultiTreadedClaimStrategyTests.cs" />
<Compile Include="MultiThreadedLowContentionClaimStrategyTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RingBufferTests.cs">
<SubType>Code</SubType>
Expand Down
283 changes: 283 additions & 0 deletions Disruptor.Tests/MultiThreadedClaimStrategyTests.cs
@@ -0,0 +1,283 @@
using System.Collections.Concurrent;
using System.Threading;
using Disruptor.Atomic;
using Moq;
using NUnit.Framework;

namespace Disruptor.Tests
{
[TestFixture]
public class MultiThreadedClaimStrategyTests
{
private const int BufferSize = 8;
private IClaimStrategy _claimStrategy;

[SetUp]
public void SetUp()
{
_claimStrategy = new MultiThreadedClaimStrategy(BufferSize);
}

[Test]
public void ShouldGetCorrectBufferSize()
{
Assert.AreEqual(BufferSize, _claimStrategy.BufferSize);
}

[Test]
public void ShouldGetInitialSequence()
{
Assert.AreEqual(Sequencer.InitialCursorValue, _claimStrategy.Sequence);
}

[Test]
public void ShouldClaimInitialSequence()
{
var dependentSequence = new Mock<Sequence>();

Sequence[] dependentSequences = { dependentSequence.Object };
const long expectedSequence = Sequencer.InitialCursorValue + 1L;

Assert.AreEqual(expectedSequence, _claimStrategy.IncrementAndGet(dependentSequences));
Assert.AreEqual(expectedSequence, _claimStrategy.Sequence);
}

[Test]
public void ShouldClaimInitialBatchOfSequences()
{
var dependentSequence = new Mock<Sequence>();

Sequence[] dependentSequences = { dependentSequence.Object };
const int batchSize = 5;
const long expectedSequence = Sequencer.InitialCursorValue + batchSize;

Assert.AreEqual(expectedSequence, _claimStrategy.IncrementAndGet(batchSize, dependentSequences));
Assert.AreEqual(expectedSequence, _claimStrategy.Sequence);
}

[Test]
public void ShouldSetSequenceToValue()
{
var dependentSequence = new Mock<Sequence>();

Sequence[] dependentSequences = { dependentSequence.Object };
const int expectedSequence = 5;
_claimStrategy.SetSequence(expectedSequence, dependentSequences);

Assert.AreEqual(expectedSequence, _claimStrategy.Sequence);
}

[Test]
public void ShouldHaveInitialAvailableCapacity()
{
var dependentSequence = new Mock<Sequence>();

Sequence[] dependentSequences = { dependentSequence.Object };

Assert.IsTrue(_claimStrategy.HasAvailableCapacity(1, dependentSequences));
}

[Test]
public void ShouldNotHaveAvailableCapacityWhenBufferIsFull()
{
var dependentSequence = new Mock<Sequence>();
dependentSequence.Setup(ds => ds.Value).Returns(Sequencer.InitialCursorValue);

Sequence[] dependentSequences = { dependentSequence.Object };

_claimStrategy.SetSequence(_claimStrategy.BufferSize - 1L, dependentSequences);

Assert.IsFalse(_claimStrategy.HasAvailableCapacity(1, dependentSequences));
}

[Test]
public void ShouldNotReturnNextClaimSequenceUntilBufferHasReserve()
{
var dependentSequence = new Sequence(Sequencer.InitialCursorValue);
Sequence[] dependentSequences = { dependentSequence };
_claimStrategy.SetSequence(_claimStrategy.BufferSize - 1L, dependentSequences);

var done = new AtomicBool(false);
var beforeLatch = new ManualResetEvent(false);
var afterLatch = new ManualResetEvent(false);

new Thread(
()=>
{
beforeLatch.Set();
Assert.AreEqual(_claimStrategy.BufferSize, _claimStrategy.IncrementAndGet(dependentSequences));
done.Value = true;
afterLatch.Set();
}).Start();

beforeLatch.WaitOne();

Thread.Sleep(1000);
Assert.IsFalse(done.Value);

dependentSequence.Value = dependentSequence.Value + 1L;

afterLatch.WaitOne();
Assert.AreEqual(_claimStrategy.BufferSize, _claimStrategy.Sequence);
}

[Test]
public void ShouldSerialisePublishingOnTheCursor()
{
var dependentSequence = new Sequence(Sequencer.InitialCursorValue);
Sequence[] dependentSequences = { dependentSequence };

long sequence = _claimStrategy.IncrementAndGet(dependentSequences);

var cursor = new Sequence(Sequencer.InitialCursorValue);
_claimStrategy.SerialisePublishing(sequence, cursor, 1);

Assert.AreEqual(sequence, cursor.Value);
}

[Test]
public void ShouldSerialisePublishingOnTheCursorWhenTwoThreadsArePublishing()
{
var dependentSequence = new Sequence(Sequencer.InitialCursorValue);
var dependentSequences = new[] { dependentSequence };

var threadSequences = new ConcurrentDictionary<long, string>();
var cursor = new SequenceStub(Sequencer.InitialCursorValue, threadSequences);

var mre = new ManualResetEvent(false);

var t1 = new Thread(
() =>
{
var sequence = _claimStrategy.IncrementAndGet(dependentSequences);
mre.Set();
Thread.Sleep(1000);
_claimStrategy.SerialisePublishing(sequence, cursor, 1);
});

var t2 = new Thread(
() =>
{
mre.WaitOne();
var sequence = _claimStrategy.IncrementAndGet(dependentSequences);
_claimStrategy.SerialisePublishing(sequence, cursor, 1);
});

t1.Name = "tOne";
t2.Name = "tTwo";
t1.Start();
t2.Start();
t1.Join();
t2.Join();

Assert.IsNotNull(threadSequences[0]);
Assert.IsNotNull(threadSequences[1]);
}

public class SequenceStub : Sequence
{
private readonly ConcurrentDictionary<long, string> _threadSequences = new ConcurrentDictionary<long, string>();

public SequenceStub(long initialValue, ConcurrentDictionary<long, string> threadSequences)
: base(initialValue)
{
_threadSequences = threadSequences;
}

public override bool CompareAndSet(long expectedSequence, long nextSequence)
{
var threadName = Thread.CurrentThread.Name;
if ("tOne" == threadName || "tTwo" == threadName)
{
_threadSequences[nextSequence] = threadName;
}
return base.CompareAndSet(expectedSequence, nextSequence);
}
}

//[Test]
//public void shouldSerialisePublishingOnTheCursorWhenTwoThreadsArePublishing()
//{
// Sequence dependentSequence = new Sequence(Sequencer.InitialCursorValue);
// Sequence[] dependentSequences = { dependentSequence };

// AtomicReferenceArray<String> threadSequences = new AtomicReferenceArray<String>(2);

// Sequence cursor = new Sequence(Sequencer.InitialCursorValue)
// {
// @Override
// public boolean compareAndSet(long expectedSequence, long nextSequence)
// {
// String threadName = Thread.currentThread().getName();
// if ("tOne".equals(threadName) || "tTwo".equals(threadName))
// {
// threadSequences.set((int)nextSequence, threadName);
// }

// return super.compareAndSet(expectedSequence, nextSequence);
// }
// };

// CountDownLatch orderingLatch = new CountDownLatch(1);

// Runnable publisherOne = new Runnable()
// {
// @Override
// public void run()
// {
// long sequence = _claimStrategy.IncrementAndGet(dependentSequences);
// orderingLatch.countDown();

// try
// {
// Thread.sleep(1000L);
// }
// catch (InterruptedException e)
// {
// // don't care
// }

// _claimStrategy.SerialisePublishing(sequence, cursor, 1);
// }
// };

// Runnable publisherTwo = new Runnable()
// {
// @Override
// public void run()
// {
// try
// {
// orderingLatch.await();
// }
// catch (InterruptedException e)
// {
// e.printStackTrace();
// }

// long sequence = _claimStrategy.IncrementAndGet(dependentSequences);

// _claimStrategy.SerialisePublishing(sequence, cursor, 1);
// }
// };

// Thread tOne = new Thread(publisherOne);
// Thread tTwo = new Thread(publisherTwo);
// tOne.setName("tOne");
// tTwo.setName("tTwo");
// tOne.start();
// tTwo.start();
// tOne.join();
// tTwo.join();

// // One thread can end up setting both sequences.
// assertThat(threadSequences.get(0), is(notNullValue()));
// assertThat(threadSequences.get(1), is(notNullValue()));
//}
}
}
Expand Up @@ -7,15 +7,15 @@
namespace Disruptor.Tests
{
[TestFixture]
public class MultiTreadedClaimStrategyTests
public class MultiThreadedLowContentionClaimStrategyTests
{
private const int BufferSize = 8;
private IClaimStrategy _claimStrategy;

[SetUp]
public void SetUp()
{
_claimStrategy = new MultiThreadedClaimStrategy(BufferSize);
_claimStrategy = new MultiThreadedLowContentionClaimStrategy(BufferSize);
}

[Test]
Expand Down
25 changes: 25 additions & 0 deletions Disruptor/Atomic/AtomicLongArray.cs
@@ -0,0 +1,25 @@
using System.Threading;

namespace Disruptor.Atomic
{
internal class AtomicLongArray
{
private readonly long[] _array;

public AtomicLongArray(int length)
{
_array = new long[length];
}

public long this[int index]
{
get { return Thread.VolatileRead(ref _array[index]); }
set { Thread.VolatileWrite(ref _array[index], value); }
}

public int Length
{
get { return _array.Length; }
}
}
}
3 changes: 3 additions & 0 deletions Disruptor/Disruptor.csproj
Expand Up @@ -51,6 +51,7 @@
<Compile Include="AggregateEventHandler.cs" />
<Compile Include="AlertException.cs" />
<Compile Include="Atomic\AtomicBool.cs" />
<Compile Include="Atomic\AtomicLongArray.cs" />
<Compile Include="Atomic\AtomicReference.cs" />
<Compile Include="BlockingWaitStrategy.cs" />
<Compile Include="BusySpinWaitStrategy.cs" />
Expand All @@ -63,6 +64,8 @@
<Compile Include="IgnoreExceptionHandler.cs" />
<Compile Include="IWorkHandler.cs" />
<Compile Include="MultiThreadedClaimStrategy.cs" />
<Compile Include="MultiThreadedLowContentionClaimStrategy.cs" />
<Compile Include="MutableLong.cs" />
<Compile Include="ProcessingSequenceBarrier.cs" />
<Compile Include="BatchEventProcessor.cs" />
<Compile Include="Collections\Histogram.cs" />
Expand Down

0 comments on commit 1d6d87c

Please sign in to comment.