Skip to content

Commit

Permalink
ConcurrentRequestQueue - Improved DequeueBatch performance when block…
Browse files Browse the repository at this point in the history
…ing mode (And unit test for deadlock issue)
  • Loading branch information
snakefoot committed May 23, 2019
1 parent ed250c1 commit 48d12d4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/NLog/Targets/Wrappers/ConcurrentRequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,15 @@ private long WaitForBelowRequestLimit()
// If yield did not help, then wait on a lock
while (currentCount > RequestLimit)
{
Interlocked.Decrement(ref _count);
InternalLogger.Debug("Blocking because the overflow action is Block...");
if (!lockTaken)
Monitor.Enter(_logEventInfoQueue);
else
Monitor.Wait(_logEventInfoQueue);
lockTaken = true;
InternalLogger.Trace("Entered critical section.");
currentCount = Interlocked.Read(ref _count);
currentCount = Interlocked.Increment(ref _count);
}

if (lockTaken)
Expand Down Expand Up @@ -223,11 +224,10 @@ public override void DequeueBatch(int count, IList<AsyncLogEventInfo> result)

if (dequeueBatch)
{
bool lockTaken = Monitor.TryEnter(_logEventInfoQueue); // Try to throttle
bool lockTaken = result.Count == count && Monitor.TryEnter(_logEventInfoQueue); // Only try throttle when falling behind
try
{
for (int i = 0; i < result.Count; ++i)
Interlocked.Decrement(ref _count);
Interlocked.Add(ref _count, -result.Count);
if (lockTaken)
Monitor.PulseAll(_logEventInfoQueue);
}
Expand Down
66 changes: 48 additions & 18 deletions tests/NLog.UnitTests/Targets/Wrappers/AsyncTargetWrapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
#endif
BatchSize = 3,
QueueLimit = 5, // Will make it "sleep" between every second write
FullBatchSizeWriteLimit = 1,
OverflowAction = AsyncTargetWrapperOverflowAction.Block
};
targetWrapper.Initialize(null);
Expand All @@ -108,35 +109,64 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
int flushCounter = 0;
AsyncContinuation flushHandler = (ex) => { ++flushCounter; };

List<KeyValuePair<LogEventInfo, AsyncContinuation>> itemPrepareList = new List<KeyValuePair<LogEventInfo, AsyncContinuation>>(500);
List<int> itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i< itemPrepareList.Capacity; ++i)
var itemPrepareList = new List<AsyncLogEventInfo>(500);
var itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i < itemPrepareList.Capacity; ++i)
{
var logEvent = new LogEventInfo();
int sequenceID = logEvent.SequenceID;
itemPrepareList.Add(new KeyValuePair<LogEventInfo, AsyncContinuation>(logEvent, (ex) => itemWrittenList.Add(sequenceID)));
bool blockConsumer = (itemPrepareList.Capacity / 2) == i; // Force producers to get into blocking-mode
itemPrepareList.Add(logEvent.WithContinuation((ex) => { if (blockConsumer) Thread.Sleep(125); itemWrittenList.Add(sequenceID); }));
}

long startTicks = Environment.TickCount;
for (int i = 0; i < itemPrepareList.Count; ++i)
var eventProducer1 = new AutoResetEvent(false);
ParameterizedThreadStart producerMethod = (s) =>
{
var logEvent = itemPrepareList[i].Key;
targetWrapper.WriteAsyncLogEvent(logEvent.WithContinuation(itemPrepareList[i].Value));
}
AutoResetEvent eventProducer = (AutoResetEvent)s;
if (eventProducer != null)
eventProducer.Set(); // Signal we are ready
int partitionNo = ReferenceEquals(eventProducer, eventProducer1) ? 1 : 0;
for (int i = 0; i < itemPrepareList.Count; ++i)
{
if (i % 2 == partitionNo)
targetWrapper.WriteAsyncLogEvent(itemPrepareList[i]);
}
};

Thread producer1 = new Thread(producerMethod);
producer1.IsBackground = true;
producer1.Start(eventProducer1);
Assert.True(eventProducer1.WaitOne(5000), "Producer1 Start Timeout");

long startTicks = Environment.TickCount;

producerMethod(null); // Execute concurrently with producer1
Assert.True(producer1.Join(5000), "Producer1 Complete Timeout"); // Wait for producer1 to complete

long elapsedMilliseconds = Environment.TickCount - startTicks;

targetWrapper.Flush(flushHandler);

for (int i = 0; i < itemPrepareList.Count * 2 && itemWrittenList.Count != itemPrepareList.Count; ++i)
Thread.Sleep(1);

long elapsedMilliseconds = Environment.TickCount - startTicks;

Assert.Equal(itemPrepareList.Count, itemWrittenList.Count);
int prevSequenceID = 0;
for (int i = 0; i < itemWrittenList.Count; ++i)

int producer0sequenceID = 0;
int producer1sequenceID = 0;
for (int i = 1; i < itemWrittenList.Count; ++i)
{
Assert.True(prevSequenceID < itemWrittenList[i]);
prevSequenceID = itemWrittenList[i];
if (itemWrittenList[i] % 2 == 0)
{
Assert.True(producer0sequenceID < itemWrittenList[i], "Producer0 invalid sequence");
producer0sequenceID = itemWrittenList[i];
}
else
{
Assert.True(producer1sequenceID < itemWrittenList[i], "Producer1 invalid sequence");
producer1sequenceID = itemWrittenList[i];
}
}

#if NET4_5
Expand Down Expand Up @@ -590,11 +620,11 @@ public void EventQueueGrow_OnQueueGrow()

int expectedGrowingNumber = 0;

#if NETCOREAPP2_0
#if NETCOREAPP2_0
expectedGrowingNumber = loggedEventCount - queueLimit;
#else
#else
expectedGrowingNumber = 3;
#endif
#endif

int eventsCounter = 0;
var myTarget = new MyTarget();
Expand Down

0 comments on commit 48d12d4

Please sign in to comment.