Permalink
Browse files

Fixed: waiting idle state in clientAPI based states

  • Loading branch information...
1 parent 87a1cff commit bc0720fbb7119ea1255246659cf852254bb93cc5 @ysw ysw committed Feb 14, 2014
View
17 src/EventStore/EventStore.Core/Bus/QueueStatsCollector.cs
@@ -224,12 +224,14 @@ public QueueStats GetStatistics(int currentQueueLength)
private static int _nonIdle = 0;
private static ICheckpoint _writerCheckpoint;
private static ICheckpoint _chaserCheckpoint;
+ private static int _length;
public static void InitializeIdleDetection(bool enable = true)
{
if (enable)
{
_nonIdle = 0;
+ _length = 0;
_notifyLock = new object();
}
else
@@ -242,11 +244,10 @@ public static void InitializeIdleDetection(bool enable = true)
[Conditional("DEBUG")]
public static void WaitIdle()
{
- throw new NotImplementedException("Must account for real queue length as well. for instance interlocked on enqueue and processed");
#if DEBUG
lock (_notifyLock)
{
- while (_nonIdle > 0 || _writerCheckpoint.Read() != _chaserCheckpoint.Read())
+ while (_nonIdle > 0 || _length > 0 || _writerCheckpoint.Read() != _chaserCheckpoint.Read())
{
if (!Monitor.Wait(_notifyLock, 100))
Console.WriteLine("Waiting for IDLE state...");
@@ -261,6 +262,18 @@ public static void InitializeCheckpoints(ICheckpoint writerCheckpoint, ICheckpoi
_writerCheckpoint = writerCheckpoint;
}
#endif
+
+ [Conditional("DEBUG")]
+ public void Enqueued()
+ {
+ Interlocked.Increment(ref _length);
+ }
+
+ [Conditional("DEBUG")]
+ public void Dequeued()
+ {
+ Interlocked.Decrement(ref _length);
+ }
}
}
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandlerAutoReset.cs
@@ -146,6 +146,9 @@ private void ReadFromQueue(object o)
else
{
_queueStats.EnterBusy();
+#if DEBUG
+ _queueStats.Dequeued();
+#endif
iterationsCount = 0;
@@ -191,6 +194,9 @@ private void ReadFromQueue(object o)
public void Publish(Message message)
{
//Ensure.NotNull(message, "message");
+#if DEBUG
+ _queueStats.Enqueued();
+#endif
_queue.Enqueue(message);
if (_starving)
_msgAddEvent.Set();
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandlerMRES.cs
@@ -132,6 +132,9 @@ private void ReadFromQueue(object o)
else
{
_queueStats.EnterBusy();
+#if DEBUG
+ _queueStats.Dequeued();
+#endif
var cnt = _queue.Count;
_queueStats.ProcessingStarted(msg.GetType(), cnt);
@@ -175,6 +178,9 @@ private void ReadFromQueue(object o)
public void Publish(Message message)
{
//Ensure.NotNull(message, "message");
+#if DEBUG
+ _queueStats.Enqueued();
+#endif
_queue.Enqueue(message);
if (_starving)
_msgAddEvent.Set();
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandlerPulse.cs
@@ -134,6 +134,9 @@ private void ReadFromQueue(object o)
}
_queueStats.EnterBusy();
+#if DEBUG
+ _queueStats.Dequeued();
+#endif
var cnt = _queue.Count;
_queueStats.ProcessingStarted(msg.GetType(), cnt);
@@ -176,6 +179,9 @@ private void ReadFromQueue(object o)
public void Publish(Message message)
{
//Ensure.NotNull(message, "message");
+#if DEBUG
+ _queueStats.Enqueued();
+#endif
_queue.Enqueue(message);
if (_starving)
{
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandlerSleep.cs
@@ -137,6 +137,9 @@ private void ReadFromQueue(object o)
else
{
_queueStats.EnterBusy();
+#if DEBUG
+ _queueStats.Dequeued();
+#endif
var cnt = _queue.Count;
_queueStats.ProcessingStarted(msg.GetType(), cnt);
@@ -180,6 +183,9 @@ private void ReadFromQueue(object o)
public void Publish(Message message)
{
//Ensure.NotNull(message, "message");
+#if DEBUG
+ _queueStats.Enqueued();
+#endif
_queue.Enqueue(message);
}
View
6 src/EventStore/EventStore.Core/Bus/QueuedHandlerThreadPool.cs
@@ -115,6 +115,9 @@ private void ReadFromQueue(object o)
Message msg;
while (!_stop && _queue.TryDequeue(out msg))
{
+#if DEBUG
+ _queueStats.Dequeued();
+#endif
try
{
var queueCnt = _queue.Count;
@@ -161,6 +164,9 @@ private void ReadFromQueue(object o)
public void Publish(Message message)
{
//Ensure.NotNull(message, "message");
+#if DEBUG
+ _queueStats.Enqueued();
+#endif
_queue.Enqueue(message);
if (Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(ReadFromQueue);

0 comments on commit bc0720f

Please sign in to comment.