Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async / buffering wrapper: Improve performance when blocking on NetCore #3416

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/NLog/Targets/Wrappers/AsyncRequestQueue-T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)

case AsyncTargetWrapperOverflowAction.Grow:
InternalLogger.Debug("The overflow action is Grow, adding element anyway");
RequestLimit *= 2;
OnLogEventQueueGrows(RequestCount + 1);
RequestLimit *= 2;
break;

case AsyncTargetWrapperOverflowAction.Block:
Expand Down
31 changes: 18 additions & 13 deletions src/NLog/Targets/Wrappers/ConcurrentRequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)
InternalLogger.Debug("Discarding one element from queue");
currentCount = Interlocked.Decrement(ref _count);
OnLogEventDropped(lostItem.LogEvent);
break;
break;
}
currentCount = Interlocked.Read(ref _count);
Interlocked.Decrement(ref _count);
currentCount = Interlocked.Increment(ref _count);
} while (currentCount > RequestLimit);
}
break;
Expand All @@ -106,13 +107,15 @@ public override bool Enqueue(AsyncLogEventInfo logEventInfo)
break;
case AsyncTargetWrapperOverflowAction.Grow:
{
InternalLogger.Debug("The overflow action is Grow, adding element anyway");
OnLogEventQueueGrows(currentCount);
RequestLimit *= 2;
}
break;
}
}
_logEventInfoQueue.Enqueue(logEventInfo);
return currentCount == 1;
return currentCount == 1; // Inserted first item in empty queue
}

private long WaitForBelowRequestLimit()
Expand All @@ -127,19 +130,15 @@ private long WaitForBelowRequestLimit()
// If yield did not help, then wait on a lock
while (currentCount > RequestLimit)
{
Interlocked.Decrement(ref _count);
InternalLogger.Debug("Blocking because the overflow action is Block...");
if (!lockTaken)
Monitor.Enter(_logEventInfoQueue);
else
Monitor.Wait(_logEventInfoQueue);
lockTaken = true;
InternalLogger.Trace("Entered critical section.");
currentCount = Interlocked.Read(ref _count);
}

if (lockTaken)
{
Monitor.PulseAll(_logEventInfoQueue);
currentCount = Interlocked.Increment(ref _count);
}
}
finally
Expand All @@ -158,6 +157,7 @@ private long TrySpinWaitForLowerCount()
SpinWait spinWait = new SpinWait();
for (int i = 0; i <= 20; ++i)
{
Interlocked.Decrement(ref _count);
if (spinWait.NextSpinWillYield)
{
if (firstYield)
Expand All @@ -166,7 +166,7 @@ private long TrySpinWaitForLowerCount()
}

spinWait.SpinOnce();
currentCount = Interlocked.Read(ref _count);
currentCount = Interlocked.Increment(ref _count);
if (currentCount <= RequestLimit)
break;
}
Expand Down Expand Up @@ -223,11 +223,16 @@ public override void DequeueBatch(int count, IList<AsyncLogEventInfo> result)

if (dequeueBatch)
{
bool lockTaken = Monitor.TryEnter(_logEventInfoQueue); // Try to throttle
bool lockTaken = false;
if (result.Count == count)
{
Monitor.Enter(_logEventInfoQueue); // Only try throttle when falling behind
lockTaken = true;
}

try
{
for (int i = 0; i < result.Count; ++i)
Interlocked.Decrement(ref _count);
Interlocked.Add(ref _count, -result.Count);
if (lockTaken)
Monitor.PulseAll(_logEventInfoQueue);
}
Expand Down
89 changes: 55 additions & 34 deletions tests/NLog.UnitTests/Targets/Wrappers/AsyncTargetWrapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
#endif
BatchSize = 3,
QueueLimit = 5, // Will make it "sleep" between every second write
FullBatchSizeWriteLimit = 1,
OverflowAction = AsyncTargetWrapperOverflowAction.Block
};
targetWrapper.Initialize(null);
Expand All @@ -108,35 +109,69 @@ private void AsyncTargetWrapperSyncTest_WhenTimeToSleepBetweenBatchesIsEqualToZe
int flushCounter = 0;
AsyncContinuation flushHandler = (ex) => { ++flushCounter; };

List<KeyValuePair<LogEventInfo, AsyncContinuation>> itemPrepareList = new List<KeyValuePair<LogEventInfo, AsyncContinuation>>(500);
List<int> itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i< itemPrepareList.Capacity; ++i)
var itemPrepareList = new List<AsyncLogEventInfo>(500);
var itemWrittenList = new List<int>(itemPrepareList.Capacity);
for (int i = 0; i < itemPrepareList.Capacity; ++i)
{
var logEvent = new LogEventInfo();
int sequenceID = logEvent.SequenceID;
itemPrepareList.Add(new KeyValuePair<LogEventInfo, AsyncContinuation>(logEvent, (ex) => itemWrittenList.Add(sequenceID)));
bool blockConsumer = (itemPrepareList.Capacity / 2) == i; // Force producers to get into blocking-mode
itemPrepareList.Add(logEvent.WithContinuation((ex) => { if (blockConsumer) Thread.Sleep(125); itemWrittenList.Add(sequenceID); }));
}

long startTicks = Environment.TickCount;
for (int i = 0; i < itemPrepareList.Count; ++i)
var eventProducer0 = new ManualResetEvent(false);
var eventProducer1 = new ManualResetEvent(false);
ParameterizedThreadStart producerMethod = (s) =>
{
var logEvent = itemPrepareList[i].Key;
targetWrapper.WriteAsyncLogEvent(logEvent.WithContinuation(itemPrepareList[i].Value));
}
var eventProducer = (ManualResetEvent)s;
if (eventProducer != null)
eventProducer.Set(); // Signal we are ready

int partitionNo = ReferenceEquals(eventProducer, eventProducer1) ? 1 : 0;
for (int i = 0; i < itemPrepareList.Count; ++i)
{
if (i % 2 == partitionNo)
targetWrapper.WriteAsyncLogEvent(itemPrepareList[i]);
}
};

Thread producer0 = new Thread(producerMethod);
producer0.IsBackground = true;
Thread producer1 = new Thread(producerMethod);
producer1.IsBackground = true;
producer1.Start(eventProducer0);
producer0.Start(eventProducer1);
Assert.True(eventProducer0.WaitOne(5000), "Producer0 Start Timeout");
Assert.True(eventProducer1.WaitOne(5000), "Producer1 Start Timeout");

long startTicks = Environment.TickCount;

Assert.True(producer0.Join(5000), "Producer0 Complete Timeout"); // Wait for producer0 to complete
Assert.True(producer1.Join(5000), "Producer1 Complete Timeout"); // Wait for producer1 to complete

long elapsedMilliseconds = Environment.TickCount - startTicks;

targetWrapper.Flush(flushHandler);

for (int i = 0; i < itemPrepareList.Count * 2 && itemWrittenList.Count != itemPrepareList.Count; ++i)
Thread.Sleep(1);

long elapsedMilliseconds = Environment.TickCount - startTicks;

Assert.Equal(itemPrepareList.Count, itemWrittenList.Count);
int prevSequenceID = 0;
for (int i = 0; i < itemWrittenList.Count; ++i)

int producer0sequenceID = 0;
int producer1sequenceID = 0;
for (int i = 1; i < itemWrittenList.Count; ++i)
{
Assert.True(prevSequenceID < itemWrittenList[i]);
prevSequenceID = itemWrittenList[i];
if (itemWrittenList[i] % 2 == 0)
{
Assert.True(producer0sequenceID < itemWrittenList[i], "Producer0 invalid sequence");
producer0sequenceID = itemWrittenList[i];
}
else
{
Assert.True(producer1sequenceID < itemWrittenList[i], "Producer1 invalid sequence");
producer1sequenceID = itemWrittenList[i];
}
}

#if NET4_5
Expand Down Expand Up @@ -475,7 +510,7 @@ public void LogEventDropped_OnRequestqueueOverflow()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -514,7 +549,7 @@ public void LogEventNotDropped_IfOverflowActionBlock()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -552,7 +587,7 @@ public void LogEventNotDropped_IfOverflowActionGrow()
int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -588,18 +623,12 @@ public void EventQueueGrow_OnQueueGrow()
int queueLimit = 2;
int loggedEventCount = 10;

int expectedGrowingNumber = 0;

#if NETCOREAPP2_0
expectedGrowingNumber = loggedEventCount - queueLimit;
#else
expectedGrowingNumber = 3;
#endif
int expectedGrowingNumber = 3;

int eventsCounter = 0;
var myTarget = new MyTarget();

var targetWrapper = new AsyncTargetWrapperForEventTests()
var targetWrapper = new AsyncTargetWrapper()
{
WrappedTarget = myTarget,
QueueLimit = queueLimit,
Expand Down Expand Up @@ -630,14 +659,6 @@ public void EventQueueGrow_OnQueueGrow()
}
}

private class AsyncTargetWrapperForEventTests : AsyncTargetWrapper
{
public void WriteEvent(AsyncLogEventInfo logEventInfo)
{
Write(logEventInfo);
}
}

private class MyAsyncTarget : Target
{
private readonly NLog.Internal.AsyncOperationCounter pendingWriteCounter = new NLog.Internal.AsyncOperationCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void RaiseEventLogEventQueueGrow_OnLogItems()
{
const int RequestsLimit = 2;
const int EventsCount = 5;
const int ExpectedCountOfGrovingTimes = EventsCount - RequestsLimit;
const int ExpectedCountOfGrovingTimes = 2;
const int ExpectedFinalSize = 8;
int grovingItemsCount = 0;

ConcurrentRequestQueue requestQueue = new ConcurrentRequestQueue(RequestsLimit, AsyncTargetWrapperOverflowAction.Grow);
Expand All @@ -58,6 +59,7 @@ public void RaiseEventLogEventQueueGrow_OnLogItems()
}

Assert.Equal(ExpectedCountOfGrovingTimes, grovingItemsCount);
Assert.Equal(ExpectedFinalSize, requestQueue.RequestLimit);
}

[Fact]
Expand Down