Skip to content

Commit

Permalink
Merge pull request #848 from lscpike/replication_delay
Browse files Browse the repository at this point in the history
Trigger cluster replication immediately after flush to speed up writes.
  • Loading branch information
gregoryyoung committed Mar 11, 2016
2 parents 6bdf09f + c2b0a14 commit 0ab5668
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class MasterReplicationService : IMonitoredQueue,
private volatile bool _newSubscriptions;
private TimeSpan _noQuorumTimestamp = TimeSpan.Zero;
private bool _noQuorumNotified;
private ManualResetEventSlim _flushSignal = new ManualResetEventSlim();

public MasterReplicationService(IPublisher publisher,
Guid instanceId,
Expand Down Expand Up @@ -328,22 +329,32 @@ private void MainLoop()
_queueStats.Start();
QueueMonitor.Default.Register(this);

_db.Config.WriterCheckpoint.Flushed += OnWriterFlushed;

while (!_stop)
{
try
{

_queueStats.EnterBusy();

_queueStats.ProcessingStarted(typeof(SendReplicationData), _subscriptions.Count);

_flushSignal.Reset(); // Reset the flush signal as we're about to read anyway. This could be closer to the actual read but no harm from too many checks.

var dataFound = ManageSubscriptions();
ManageNoQuorumDetection();
var newSubscriptions = _newSubscriptions;
_newSubscriptions = false;
ManageRoleAssignments(force: newSubscriptions);

_queueStats.ProcessingEnded(_subscriptions.Count);

if (!dataFound)
{
_queueStats.EnterIdle();
Thread.Sleep(1);

_flushSignal.Wait(TimeSpan.FromMilliseconds(500));
}
}
catch (Exception exc)
Expand All @@ -357,12 +368,19 @@ private void MainLoop()
subscription.Dispose();
}

_publisher.Publish(new SystemMessage.ServiceShutdown(Name));
_db.Config.WriterCheckpoint.Flushed -= OnWriterFlushed;

_publisher.Publish(new SystemMessage.ServiceShutdown(Name));

_queueStats.Stop();
QueueMonitor.Default.Unregister(this);
}

private void OnWriterFlushed(long obj)
{
_flushSignal.Set();
}

private bool ManageSubscriptions()
{
var dataFound = false;
Expand Down Expand Up @@ -600,6 +618,11 @@ private enum ReplicaState
Slave
}

private class SendReplicationData
{

}

private class ReplicaSubscription: IDisposable
{
public readonly byte[] DataBuffer = new byte[BulkSize];
Expand Down Expand Up @@ -670,6 +693,7 @@ public void Dispose()
if (bulkReader != null)
bulkReader.Release();
}

}
}
}
20 changes: 18 additions & 2 deletions src/EventStore.Core/Services/Storage/StorageChaser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ public class StorageChaser : IMonitoredQueue,

private static readonly int TicksPerMs = (int)(Stopwatch.Frequency / 1000);
private static readonly int MinFlushDelay = 2 * TicksPerMs;
private readonly ManualResetEventSlim _flushSignal = new ManualResetEventSlim();
private static readonly TimeSpan FlushWaitTimeout = TimeSpan.FromMilliseconds(100);


public string Name { get { return _queueStats.Name; } }

private readonly IPublisher _masterBus;
Expand Down Expand Up @@ -92,6 +94,8 @@ private void ChaseTransactionLog()

try
{
_writerCheckpoint.Flushed += OnWriterFlushed;

_chaser.Open();

// We rebuild index till the chaser position, because
Expand Down Expand Up @@ -121,17 +125,29 @@ private void ChaseTransactionLog()
}
_queueStats.ProcessingEnded(0);
}

_writerCheckpoint.Flushed -= OnWriterFlushed;

_chaser.Close();

_masterBus.Publish(new SystemMessage.ServiceShutdown(Name));

_queueStats.EnterIdle();
_queueStats.Stop();
QueueMonitor.Default.Unregister(this);
}

private void OnWriterFlushed(long obj)
{
_flushSignal.Set();
}

private void ChaserIteration()
{
_queueStats.EnterBusy();

_flushSignal.Reset(); // Reset the flush signal just before a read to reduce pointless reads from [flush flush read] patterns.

var result = _chaser.TryReadNext();

if (result.Success)
Expand Down Expand Up @@ -159,9 +175,9 @@ private void ChaserIteration()
if (!result.Success)
{
_queueStats.EnterIdle();
//Thread.Sleep(1);

var startwait = _watch.ElapsedTicks;
_writerCheckpoint.WaitForFlush(FlushWaitTimeout);
_flushSignal.Wait(FlushWaitTimeout);
HistogramService.SetValue(_chaserWaitHistogram,
(long)((((double)_watch.ElapsedTicks - startwait) / Stopwatch.Frequency) * 1000000000));
}
Expand Down
25 changes: 11 additions & 14 deletions src/EventStore.Core/TransactionLog/Checkpoint/FileCheckpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public class FileCheckpoint : ICheckpoint
private readonly BinaryWriter _writer;
private readonly BinaryReader _reader;

private readonly object _flushLocker = new object();

public FileCheckpoint(string filename)
: this(filename, Guid.NewGuid().ToString())
{
Expand Down Expand Up @@ -83,11 +81,8 @@ public void Flush()

_fileStream.FlushToDisk();
Interlocked.Exchange(ref _lastFlushed, last);

lock (_flushLocker)
{
Monitor.PulseAll(_flushLocker);
}

OnFlushed(last);
}

public long Read()
Expand All @@ -101,17 +96,19 @@ public long
return Interlocked.Read(ref _last);
}

public bool WaitForFlush(TimeSpan timeout)
{
lock (_flushLocker)
{
return Monitor.Wait(_flushLocker, timeout);
}
}
public event Action<long> Flushed;

public void Dispose()
{
Close();
}

protected virtual void OnFlushed(long obj)
{
var onFlushed = Flushed;
if (onFlushed != null)
onFlushed.Invoke(obj);
}

}
}
3 changes: 1 addition & 2 deletions src/EventStore.Core/TransactionLog/Checkpoint/ICheckpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ public interface ICheckpoint: IDisposable
void Write(long checkpoint);
void Flush();
void Close();

long Read();
long ReadNonFlushed();

bool WaitForFlush(TimeSpan timeout);
event Action<long> Flushed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,16 @@ public void Flush()

Interlocked.Exchange(ref _lastFlushed, last);

lock (_flushLocker)
{
Monitor.PulseAll(_flushLocker);
}
OnFlushed(last);
}

public event Action<long> Flushed;

public bool WaitForFlush(TimeSpan timeout)
protected virtual void OnFlushed(long obj)
{
lock (_flushLocker)
{
return Monitor.Wait(_flushLocker, timeout);
}
var onFlushed = Flushed;
if (onFlushed != null)
onFlushed.Invoke(obj);
}

public void Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public class MemoryMappedFileCheckpoint : ICheckpoint
private long _lastFlushed;
private readonly MemoryMappedViewAccessor _accessor;

private readonly object _flushLocker = new object();

public MemoryMappedFileCheckpoint(string filename): this(filename, Guid.NewGuid().ToString(), false)
{
}
Expand Down Expand Up @@ -81,10 +79,7 @@ public void Flush()

Interlocked.Exchange(ref _lastFlushed, last);

lock (_flushLocker)
{
Monitor.PulseAll(_flushLocker);
}
OnFlushed(last);
}

public long Read()
Expand All @@ -101,13 +96,14 @@ public long ReadNonFlushed()
{
return Interlocked.Read(ref _last);
}

public event Action<long> Flushed;

public bool WaitForFlush(TimeSpan timeout)
protected virtual void OnFlushed(long obj)
{
lock (_flushLocker)
{
return Monitor.Wait(_flushLocker, timeout);
}
var onFlushed = Flushed;
if (onFlushed != null)
onFlushed.Invoke(obj);
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void Flush()
_stream.Write(buffer, 0, buffer.Length);

Interlocked.Exchange(ref _lastFlushed, last);

OnFlushed(last);
//FlushFileBuffers(_file.SafeMemoryMappedFileHandle.DangerousGetHandle());
}

Expand All @@ -118,10 +120,14 @@ public long ReadNonFlushed()
{
return Interlocked.Read(ref _last);
}

public event Action<long> Flushed;

public bool WaitForFlush(TimeSpan timeout)
protected virtual void OnFlushed(long obj)
{
throw new NotImplementedException();
var onFlushed = Flushed;
if (onFlushed != null)
onFlushed.Invoke(obj);
}

public void Dispose()
Expand Down

0 comments on commit 0ab5668

Please sign in to comment.