Skip to content

Commit

Permalink
ConcurrentRequestQueue - Improved DequeueBatch performance when block…
Browse files Browse the repository at this point in the history
…ing mode (And unit test for deadlock issue)
  • Loading branch information
snakefoot committed May 23, 2019
1 parent ed250c1 commit 3c4cddf
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/NLog/Targets/Wrappers/AsyncRequestQueue-T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)

case AsyncTargetWrapperOverflowAction.Grow:
InternalLogger.Debug("The overflow action is Grow, adding element anyway");
RequestLimit *= 2;
OnLogEventQueueGrows(RequestCount + 1);
RequestLimit *= 2;
break;

case AsyncTargetWrapperOverflowAction.Block:
Expand Down
14 changes: 8 additions & 6 deletions src/NLog/Targets/Wrappers/ConcurrentRequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)
InternalLogger.Debug("Discarding one element from queue");
currentCount = Interlocked.Decrement(ref _count);
OnLogEventDropped(lostItem.LogEvent);
break;
break;
}
currentCount = Interlocked.Read(ref _count);
} while (currentCount > RequestLimit);
Expand All @@ -106,13 +106,15 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)
break;
case AsyncTargetWrapperOverflowAction.Grow:
{
InternalLogger.Debug("The overflow action is Grow, adding element anyway");
OnLogEventQueueGrows(currentCount);
RequestLimit *= 2;
}
break;
}
}
_logEventInfoQueue.Enqueue(logEventInfo);
return currentCount == 1;
return currentCount == 1 || Interlocked.Read(ref _count) < currentCount; // Inserted first item in empty queue, or dequeue has been performed
}

private long WaitForBelowRequestLimit()
Expand All @@ -127,14 +129,15 @@ private long WaitForBelowRequestLimit()
// If yield did not help, then wait on a lock
while (currentCount > RequestLimit)
{
Interlocked.Decrement(ref _count);
InternalLogger.Debug("Blocking because the overflow action is Block...");
if (!lockTaken)
Monitor.Enter(_logEventInfoQueue);
else
Monitor.Wait(_logEventInfoQueue);
lockTaken = true;
InternalLogger.Trace("Entered critical section.");
currentCount = Interlocked.Read(ref _count);
currentCount = Interlocked.Increment(ref _count);
}

if (lockTaken)
Expand Down Expand Up @@ -223,11 +226,10 @@ public override void DequeueBatch(int count, IList<AsyncLogEventInfo> result)

if (dequeueBatch)
{
bool lockTaken = Monitor.TryEnter(_logEventInfoQueue); // Try to throttle
bool lockTaken = result.Count == count && Monitor.TryEnter(_logEventInfoQueue); // Only try throttle when falling behind
try
{
for (int i = 0; i < result.Count; ++i)
Interlocked.Decrement(ref _count);
Interlocked.Add(ref _count, -result.Count);
if (lockTaken)
Monitor.PulseAll(_logEventInfoQueue);
}
Expand Down
89 changes: 55 additions & 34 deletions tests/NLog.UnitTests/Targets/Wrappers/AsyncTargetWrapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
#endif
BatchSize = 3,
QueueLimit = 5, // Will make it "sleep" between every second write
FullBatchSizeWriteLimit = 1,
OverflowAction = AsyncTargetWrapperOverflowAction.Block
};
targetWrapper.Initialize(null);
Expand All @@ -108,35 +109,69 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
int flushCounter = 0;
AsyncContinuation flushHandler = (ex) => { ++flushCounter; };

List<KeyValuePair<LogEventInfo, AsyncContinuation>> itemPrepareList = new List<KeyValuePair<LogEventInfo, AsyncContinuation>>(500);
List<int> itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i< itemPrepareList.Capacity; ++i)
var itemPrepareList = new List<AsyncLogEventInfo>(500);
var itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i < itemPrepareList.Capacity; ++i)
{
var logEvent = new LogEventInfo();
int sequenceID = logEvent.SequenceID;
itemPrepareList.Add(new KeyValuePair<LogEventInfo, AsyncContinuation>(logEvent, (ex) => itemWrittenList.Add(sequenceID)));
bool blockConsumer = (itemPrepareList.Capacity / 2) == i; // Force producers to get into blocking-mode
itemPrepareList.Add(logEvent.WithContinuation((ex) => { if (blockConsumer) Thread.Sleep(125); itemWrittenList.Add(sequenceID); }));
}

long startTicks = Environment.TickCount;
for (int i = 0; i < itemPrepareList.Count; ++i)
var eventProducer0 = new ManualResetEvent(false);
var eventProducer1 = new ManualResetEvent(false);
ParameterizedThreadStart producerMethod = (s) =>
{
var logEvent = itemPrepareList[i].Key;
targetWrapper.WriteAsyncLogEvent(logEvent.WithContinuation(itemPrepareList[i].Value));
}
var eventProducer = (ManualResetEvent)s;
if (eventProducer != null)
eventProducer.Set(); // Signal we are ready
int partitionNo = ReferenceEquals(eventProducer, eventProducer1) ? 1 : 0;
for (int i = 0; i < itemPrepareList.Count; ++i)
{
if (i % 2 == partitionNo)
targetWrapper.WriteAsyncLogEvent(itemPrepareList[i]);
}
};

Thread producer0 = new Thread(producerMethod);
producer0.IsBackground = true;
Thread producer1 = new Thread(producerMethod);
producer1.IsBackground = true;
producer1.Start(eventProducer0);
producer0.Start(eventProducer1);
Assert.True(eventProducer0.WaitOne(5000), "Producer0 Start Timeout");
Assert.True(eventProducer1.WaitOne(5000), "Producer1 Start Timeout");

long startTicks = Environment.TickCount;

Assert.True(producer0.Join(5000), "Producer0 Complete Timeout"); // Wait for producer0 to complete
Assert.True(producer1.Join(5000), "Producer1 Complete Timeout"); // Wait for producer1 to complete

long elapsedMilliseconds = Environment.TickCount - startTicks;

targetWrapper.Flush(flushHandler);

for (int i = 0; i < itemPrepareList.Count * 2 && itemWrittenList.Count != itemPrepareList.Count; ++i)
Thread.Sleep(1);

long elapsedMilliseconds = Environment.TickCount - startTicks;

Assert.Equal(itemPrepareList.Count, itemWrittenList.Count);
int prevSequenceID = 0;
for (int i = 0; i < itemWrittenList.Count; ++i)

int producer0sequenceID = 0;
int producer1sequenceID = 0;
for (int i = 1; i < itemWrittenList.Count; ++i)
{
Assert.True(prevSequenceID < itemWrittenList[i]);
prevSequenceID = itemWrittenList[i];
if (itemWrittenList[i] % 2 == 0)
{
Assert.True(producer0sequenceID < itemWrittenList[i], "Producer0 invalid sequence");
producer0sequenceID = itemWrittenList[i];
}
else
{
Assert.True(producer1sequenceID < itemWrittenList[i], "Producer1 invalid sequence");
producer1sequenceID = itemWrittenList[i];
}
}

#if NET4_5
Expand Down Expand Up @@ -475,7 +510,7 @@ public void LogEventDropped_OnRequestqueueOverflow()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -514,7 +549,7 @@ public void LogEventNotDropped_IfOverflowActionBlock()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -552,7 +587,7 @@ public void LogEventNotDropped_IfOverflowActionGrow()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -588,18 +623,12 @@ public void EventQueueGrow_OnQueueGrow()
int queueLimit = 2;
int loggedEventCount = 10;

int expectedGrowingNumber = 0;

#if NETCOREAPP2_0
expectedGrowingNumber = loggedEventCount - queueLimit;
#else
expectedGrowingNumber = 3;
#endif
int expectedGrowingNumber = 3;

int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -630,14 +659,6 @@ public void EventQueueGrow_OnQueueGrow()
}
}

private class AsyncTargetWrapperForEventTests : AsyncTargetWrapper
{
public void WriteEvent(AsyncLogEventInfo logEventInfo)
{
Write(logEventInfo);
}
}

private class MyAsyncTarget : Target
{
private readonly NLog.Internal.AsyncOperationCounter pendingWriteCounter = new NLog.Internal.AsyncOperationCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void RaiseEventLogEventQueueGrow_OnLogItems()
{
const int RequestsLimit = 2;
const int EventsCount = 5;
const int ExpectedCountOfGrovingTimes = EventsCount - RequestsLimit;
const int ExpectedCountOfGrovingTimes = 2;
const int ExpectedFinalSize = 8;
int grovingItemsCount = 0;

ConcurrentRequestQueue requestQueue = new ConcurrentRequestQueue(RequestsLimit, AsyncTargetWrapperOverflowAction.Grow);
Expand All @@ -58,6 +59,7 @@ public void RaiseEventLogEventQueueGrow_OnLogItems()
}

Assert.Equal(ExpectedCountOfGrovingTimes, grovingItemsCount);
Assert.Equal(ExpectedFinalSize, requestQueue.RequestLimit);
}

[Fact]
Expand Down

0 comments on commit 3c4cddf

Please sign in to comment.