Skip to content

Commit

Permalink
ConcurrentRequestQueue depends on .NET ConcurrentQueue (#2650)
Browse files Browse the repository at this point in the history
  • Loading branch information
snakefoot authored and 304NotModified committed Aug 14, 2018
1 parent 287a52d commit df0b95f
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 21 deletions.
6 changes: 4 additions & 2 deletions src/NLog/Targets/Wrappers/AsyncRequestQueue-T.cs
Expand Up @@ -35,12 +35,12 @@ namespace NLog.Targets.Wrappers
{
using System;
using System.Collections.Generic;
using Common;
using NLog.Common;

/// <summary>
/// Asynchronous request queue.
/// </summary>
internal class AsyncRequestQueue
internal class AsyncRequestQueue : IAsyncRequestQueue
{
private readonly Queue<AsyncLogEventInfo> _logEventInfoQueue = new Queue<AsyncLogEventInfo>();

Expand Down Expand Up @@ -80,6 +80,8 @@ public int RequestCount
}
}

public bool IsEmpty => RequestCount == 0;

/// <summary>
/// Enqueues another item. If the queue is overflown the appropriate
/// action is taken as specified by <see cref="OnOverflow"/>.
Expand Down
77 changes: 62 additions & 15 deletions src/NLog/Targets/Wrappers/AsyncTargetWrapper.cs
Expand Up @@ -118,7 +118,15 @@ public AsyncTargetWrapper(Target wrappedTarget)
/// <param name="overflowAction">The action to be taken when the queue overflows.</param>
public AsyncTargetWrapper(Target wrappedTarget, int queueLimit, AsyncTargetWrapperOverflowAction overflowAction)
{
RequestQueue = new AsyncRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#if NETSTANDARD2_0
// NetStandard20 includes many optimizations for ConcurrentQueue:
// - See: https://blogs.msdn.microsoft.com/dotnet/2017/06/07/performance-improvements-in-net-core/
// Net40 ConcurrencyQueue can seem to leak, because it doesn't clear properly on dequeue
// - See: https://blogs.msdn.microsoft.com/pfxteam/2012/05/08/concurrentqueuet-holding-on-to-a-few-dequeued-elements/
_requestQueue = new ConcurrentRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#else
_requestQueue = new AsyncRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#endif
TimeToSleepBetweenBatches = 50;
BatchSize = 200;
FullBatchSizeWriteLimit = 5;
Expand Down Expand Up @@ -150,8 +158,8 @@ public AsyncTargetWrapper(Target wrappedTarget, int queueLimit, AsyncTargetWrapp
[DefaultValue("Discard")]
public AsyncTargetWrapperOverflowAction OverflowAction
{
get => RequestQueue.OnOverflow;
set => RequestQueue.OnOverflow = value;
get => _requestQueue.OnOverflow;
set => _requestQueue.OnOverflow = value;
}

/// <summary>
Expand All @@ -161,8 +169,8 @@ public AsyncTargetWrapperOverflowAction OverflowAction
[DefaultValue(10000)]
public int QueueLimit
{
get => RequestQueue.RequestLimit;
set => RequestQueue.RequestLimit = value;
get => _requestQueue.RequestLimit;
set => _requestQueue.RequestLimit = value;
}

/// <summary>
Expand All @@ -173,10 +181,18 @@ public int QueueLimit
[DefaultValue(5)]
public int FullBatchSizeWriteLimit { get; set; }

#if NET4_5 || NET4_0
/// <summary>
/// Gets or sets whether to use the locking queue, instead of a lock-free concurrent queue
/// The locking queue is less concurrent when many logger threads, but reduces memory allocation
/// </summary>
public bool ForceLockingQueue { get; set; }
#endif

/// <summary>
/// Gets the queue of lazy writer thread requests.
/// </summary>
internal AsyncRequestQueue RequestQueue { get; private set; }
IAsyncRequestQueue _requestQueue;

/// <summary>
/// Schedules a flush of pending events in the queue (if any), followed by flushing the WrappedTarget.
Expand All @@ -196,10 +212,36 @@ protected override void FlushAsync(AsyncContinuation asyncContinuation)
protected override void InitializeTarget()
{
base.InitializeTarget();

if (!OptimizeBufferReuse && WrappedTarget != null && WrappedTarget.OptimizeBufferReuse)
{
OptimizeBufferReuse = GetType() == typeof(AsyncTargetWrapper); // Class not sealed, reduce breaking changes
#if NET4_5 || NET4_0
if (!OptimizeBufferReuse && !ForceLockingQueue)
{
ForceLockingQueue = true; // Avoid too much allocation, when wrapping a legacy target
}
#endif
}

#if NET4_5 || NET4_0
if (!ForceLockingQueue && OverflowAction == AsyncTargetWrapperOverflowAction.Block && BatchSize * 1.5m > QueueLimit)
{
ForceLockingQueue = true; // ConcurrentQueue does not perform well if constantly hitting QueueLimit
}

RequestQueue.Clear();
if (ForceLockingQueue != (_requestQueue is AsyncRequestQueue))
{
_requestQueue = ForceLockingQueue ? (IAsyncRequestQueue)new AsyncRequestQueue(QueueLimit, OverflowAction) : new ConcurrentRequestQueue(QueueLimit, OverflowAction);
}
#endif

if (BatchSize > QueueLimit && TimeToSleepBetweenBatches <= 0)
{
BatchSize = QueueLimit; // Avoid too much throttling
}

_requestQueue.Clear();
InternalLogger.Trace("AsyncWrapper(Name={0}): Start Timer", Name);
_lazyWriterTimer = new Timer(ProcessPendingEvents, null, Timeout.Infinite, Timeout.Infinite);
StartLazyWriterTimer();
Expand Down Expand Up @@ -313,7 +355,7 @@ protected virtual void StopLazyWriterThread()
protected override void Write(AsyncLogEventInfo logEvent)
{
PrecalculateVolatileLayouts(logEvent.LogEvent);
bool queueWasEmpty = RequestQueue.Enqueue(logEvent);
bool queueWasEmpty = _requestQueue.Enqueue(logEvent);
if (queueWasEmpty && TimeToSleepBetweenBatches <= 0)
StartInstantWriterTimer();
}
Expand Down Expand Up @@ -373,10 +415,15 @@ private void ProcessPendingEvents(object state)
{
if (TimeToSleepBetweenBatches <= 0)
{
// If queue was not empty, then more might have arrived while writing the first batch
// Uses throttled timer here, so we can process in batches (faster)
if (!wroteFullBatchSize && RequestQueue.RequestCount > 0)
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
if (!wroteFullBatchSize && !_requestQueue.IsEmpty)
{
// If queue was not empty, then more might have arrived while writing the first batch
// Uses throttled timer here, so we can process in batches (faster)
lock (_writeLockObject)
{
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
}
}
}
else
{
Expand All @@ -396,7 +443,7 @@ private void FlushEventsInQueue(object state)
if (asyncContinuation != null)
base.FlushAsync(asyncContinuation);
}
if (TimeToSleepBetweenBatches <= 0 && RequestQueue.RequestCount > 0)
if (TimeToSleepBetweenBatches <= 0 && !_requestQueue.IsEmpty)
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
}
catch (Exception exception)
Expand All @@ -423,7 +470,7 @@ private int WriteEventsInQueue(int batchSize, string reason)
{
if (!OptimizeBufferReuse || batchSize == int.MaxValue)
{
var logEvents = RequestQueue.DequeueBatch(batchSize);
var logEvents = _requestQueue.DequeueBatch(batchSize);
if (logEvents.Length > 0)
{
if (reason != null)
Expand All @@ -437,7 +484,7 @@ private int WriteEventsInQueue(int batchSize, string reason)
using (var targetList = _reusableAsyncLogEventList.Allocate())
{
var logEvents = targetList.Result;
RequestQueue.DequeueBatch(batchSize, logEvents);
_requestQueue.DequeueBatch(batchSize, logEvents);
if (logEvents.Count > 0)
{
if (reason != null)
Expand Down

0 comments on commit df0b95f

Please sign in to comment.