Skip to content

Commit

Permalink
ConcurrentRequestQueue depends on .NET ConcurrentQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
snakefoot committed Apr 9, 2018
1 parent d5b2c00 commit 3ab6284
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 17 deletions.
4 changes: 3 additions & 1 deletion src/NLog/Targets/Wrappers/AsyncRequestQueue-T.cs
Expand Up @@ -40,7 +40,7 @@ namespace NLog.Targets.Wrappers
/// <summary>
/// Asynchronous request queue.
/// </summary>
internal class AsyncRequestQueue
internal class AsyncRequestQueue : IAsyncRequestQueue
{
private readonly Queue<AsyncLogEventInfo> _logEventInfoQueue = new Queue<AsyncLogEventInfo>();

Expand Down Expand Up @@ -80,6 +80,8 @@ public int RequestCount
}
}

public bool IsEmpty => RequestCount == 0;

/// <summary>
/// Enqueues another item. If the queue is overflown the appropriate
/// action is taken as specified by <see cref="OnOverflow"/>.
Expand Down
55 changes: 40 additions & 15 deletions src/NLog/Targets/Wrappers/AsyncTargetWrapper.cs
Expand Up @@ -118,7 +118,12 @@ public AsyncTargetWrapper(Target wrappedTarget)
/// <param name="overflowAction">The action to be taken when the queue overflows.</param>
public AsyncTargetWrapper(Target wrappedTarget, int queueLimit, AsyncTargetWrapperOverflowAction overflowAction)
{
RequestQueue = new AsyncRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#if NET4_5
// Net40 ConcurrencyQueue can seem to leak, because it doesn't clear properly on dequeue
_requestQueue = new ConcurrentRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#else
_requestQueue = new AsyncRequestQueue(10000, AsyncTargetWrapperOverflowAction.Discard);
#endif
TimeToSleepBetweenBatches = 50;
BatchSize = 200;
FullBatchSizeWriteLimit = 5;
Expand Down Expand Up @@ -150,8 +155,8 @@ public AsyncTargetWrapper(Target wrappedTarget, int queueLimit, AsyncTargetWrapp
[DefaultValue("Discard")]
public AsyncTargetWrapperOverflowAction OverflowAction
{
get => RequestQueue.OnOverflow;
set => RequestQueue.OnOverflow = value;
get => _requestQueue.OnOverflow;
set => _requestQueue.OnOverflow = value;
}

/// <summary>
Expand All @@ -161,8 +166,8 @@ public AsyncTargetWrapperOverflowAction OverflowAction
[DefaultValue(10000)]
public int QueueLimit
{
get => RequestQueue.RequestLimit;
set => RequestQueue.RequestLimit = value;
get => _requestQueue.RequestLimit;
set => _requestQueue.RequestLimit = value;
}

/// <summary>
Expand All @@ -173,10 +178,25 @@ public int QueueLimit
[DefaultValue(5)]
public int FullBatchSizeWriteLimit { get; set; }

#if NET4_5 || NET4_0
/// <summary>
/// Gets or sets whether to use the locking queue, instead of a lock-free concurrent queue
/// </summary>
public bool ForceLockingQueue
{
get { return _requestQueue is AsyncRequestQueue; }
set
{
if (value != _requestQueue is AsyncRequestQueue)
_requestQueue = value ? (IAsyncRequestQueue)new AsyncRequestQueue(QueueLimit, OverflowAction) : new ConcurrentRequestQueue(QueueLimit, OverflowAction);
}
}
#endif

/// <summary>
/// Gets the queue of lazy writer thread requests.
/// </summary>
internal AsyncRequestQueue RequestQueue { get; private set; }
IAsyncRequestQueue _requestQueue;

/// <summary>
/// Schedules a flush of pending events in the queue (if any), followed by flushing the WrappedTarget.
Expand All @@ -199,7 +219,7 @@ protected override void InitializeTarget()
if (!OptimizeBufferReuse && WrappedTarget != null && WrappedTarget.OptimizeBufferReuse)
OptimizeBufferReuse = GetType() == typeof(AsyncTargetWrapper); // Class not sealed, reduce breaking changes

RequestQueue.Clear();
_requestQueue.Clear();
InternalLogger.Trace("AsyncWrapper '{0}': start timer", Name);
_lazyWriterTimer = new Timer(ProcessPendingEvents, null, Timeout.Infinite, Timeout.Infinite);
StartLazyWriterTimer();
Expand Down Expand Up @@ -314,7 +334,7 @@ protected override void Write(AsyncLogEventInfo logEvent)
{
MergeEventProperties(logEvent.LogEvent);
PrecalculateVolatileLayouts(logEvent.LogEvent);
bool queueWasEmpty = RequestQueue.Enqueue(logEvent);
bool queueWasEmpty = _requestQueue.Enqueue(logEvent);
if (queueWasEmpty && TimeToSleepBetweenBatches <= 0)
StartInstantWriterTimer();
}
Expand Down Expand Up @@ -374,10 +394,15 @@ private void ProcessPendingEvents(object state)
{
if (TimeToSleepBetweenBatches <= 0)
{
// If queue was not empty, then more might have arrived while writing the first batch
// Uses throttled timer here, so we can process in batches (faster)
if (!wroteFullBatchSize && RequestQueue.RequestCount > 0)
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
if (!wroteFullBatchSize && !_requestQueue.IsEmpty)
{
// If queue was not empty, then more might have arrived while writing the first batch
// Uses throttled timer here, so we can process in batches (faster)
lock (_writeLockObject)
{
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
}
}
}
else
{
Expand All @@ -397,7 +422,7 @@ private void FlushEventsInQueue(object state)
if (asyncContinuation != null)
base.FlushAsync(asyncContinuation);
}
if (TimeToSleepBetweenBatches <= 0 && RequestQueue.RequestCount > 0)
if (TimeToSleepBetweenBatches <= 0 && !_requestQueue.IsEmpty)
StartLazyWriterTimer(); // Queue was checked as empty, but now we have more
}
catch (Exception exception)
Expand All @@ -424,7 +449,7 @@ private int WriteEventsInQueue(int batchSize, string reason)
{
if (!OptimizeBufferReuse || batchSize == int.MaxValue)
{
var logEvents = RequestQueue.DequeueBatch(batchSize);
var logEvents = _requestQueue.DequeueBatch(batchSize);
if (logEvents.Length > 0)
{
if (reason != null)
Expand All @@ -438,7 +463,7 @@ private int WriteEventsInQueue(int batchSize, string reason)
using (var targetList = _reusableAsyncLogEventList.Allocate())
{
var logEvents = targetList.Result;
RequestQueue.DequeueBatch(batchSize, logEvents);
_requestQueue.DequeueBatch(batchSize, logEvents);
if (logEvents.Count > 0)
{
if (reason != null)
Expand Down
193 changes: 193 additions & 0 deletions src/NLog/Targets/Wrappers/ConcurrentRequestQueue.cs
@@ -0,0 +1,193 @@
//
// Copyright (c) 2004-2018 Jaroslaw Kowalski <jaak@jkowalski.net>, Kim Christensen, Julian Verdurmen
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of Jaroslaw Kowalski nor the names of its
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//

#if NET4_5 || NET4_0

namespace NLog.Targets.Wrappers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using NLog.Common;

/// <summary>
/// Concurrent Asynchronous request queue based on <see cref="ConcurrentQueue{T}"/>
/// </summary>
class ConcurrentRequestQueue : IAsyncRequestQueue
{
private readonly ConcurrentQueue<AsyncLogEventInfo> _logEventInfoQueue = new ConcurrentQueue<AsyncLogEventInfo>();

/// <summary>
/// Initializes a new instance of the AsyncRequestQueue class.
/// </summary>
/// <param name="requestLimit">Request limit.</param>
/// <param name="overflowAction">The overflow action.</param>
public ConcurrentRequestQueue(int requestLimit, AsyncTargetWrapperOverflowAction overflowAction)
{
RequestLimit = requestLimit;
OnOverflow = overflowAction;
}

/// <summary>
/// Gets or sets the request limit.
/// </summary>
public int RequestLimit { get; set; }

/// <summary>
/// Gets or sets the action to be taken when there's no more room in
/// the queue and another request is enqueued.
/// </summary>
public AsyncTargetWrapperOverflowAction OnOverflow { get; set; }

public bool IsEmpty => _logEventInfoQueue.IsEmpty;

/// <summary>
/// Gets the number of requests currently in the queue.
/// </summary>
public int Count => (int)_count;
private long _count;

/// <summary>
/// Enqueues another item. If the queue is overflown the appropriate
/// action is taken as specified by <see cref="OnOverflow"/>.
/// </summary>
/// <param name="logEventInfo">The log event info.</param>
/// <returns>Queue was empty before enqueue</returns>
public bool Enqueue(AsyncLogEventInfo logEventInfo)
{
long currentCount = Interlocked.Increment(ref _count);
if (currentCount > RequestLimit)
{
InternalLogger.Debug("Async queue is full");
switch (OnOverflow)
{
case AsyncTargetWrapperOverflowAction.Discard:
{
InternalLogger.Debug("Discarding one element from queue");
do
{
if (_logEventInfoQueue.TryDequeue(out var _))
{
currentCount = Interlocked.Decrement(ref _count);
break;
}
currentCount = Interlocked.Read(ref _count);
} while (currentCount > RequestLimit);
}
break;
case AsyncTargetWrapperOverflowAction.Block:
{
bool firstYield = true;
SpinWait spinWait = new SpinWait();
do
{
spinWait.SpinOnce();
if (firstYield && spinWait.NextSpinWillYield)
{
firstYield = false;
InternalLogger.Debug("Blocking because the overflow action is Block...");
}
currentCount = Interlocked.Read(ref _count);
} while (currentCount > RequestLimit);
}
break;
}
}
_logEventInfoQueue.Enqueue(logEventInfo);
return currentCount == 1;
}

/// <summary>
/// Dequeues a maximum of <c>count</c> items from the queue
/// and adds returns the list containing them.
/// </summary>
/// <param name="count">Maximum number of items to be dequeued (-1 means everything).</param>
/// <returns>The array of log events.</returns>
public AsyncLogEventInfo[] DequeueBatch(int count)
{
if (_logEventInfoQueue.IsEmpty)
return Internal.ArrayHelper.Empty<AsyncLogEventInfo>();

if (_count < count)
count = (int)_count;

var resultEvents = new List<AsyncLogEventInfo>(count);
for (int i = 0; i < count; ++i)
{
if (_logEventInfoQueue.TryDequeue(out var item))
{
Interlocked.Decrement(ref _count);
resultEvents.Add(item);
}
else
break;
}

if (resultEvents.Count == 0)
return Internal.ArrayHelper.Empty<AsyncLogEventInfo>();
else
return resultEvents.ToArray();
}

/// <summary>
/// Dequeues into a preallocated array, instead of allocating a new one
/// </summary>
/// <param name="count">Maximum number of items to be dequeued</param>
/// <param name="result">Preallocated list</param>
public void DequeueBatch(int count, IList<AsyncLogEventInfo> result)
{
for (int i = 0; i < count; ++i)
{
if (_logEventInfoQueue.TryDequeue(out var item))
{
Interlocked.Decrement(ref _count);
result.Add(item);
}
else
break;
}
}

/// <summary>
/// Clears the queue.
/// </summary>
public void Clear()
{
while (!_logEventInfoQueue.IsEmpty)
_logEventInfoQueue.TryDequeue(out var _);
Interlocked.Exchange(ref _count, 0);
}
}
}
#endif
50 changes: 50 additions & 0 deletions src/NLog/Targets/Wrappers/IAsyncRequestQueue.cs
@@ -0,0 +1,50 @@
//
// Copyright (c) 2004-2018 Jaroslaw Kowalski <jaak@jkowalski.net>, Kim Christensen, Julian Verdurmen
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of Jaroslaw Kowalski nor the names of its
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//

using System.Collections.Generic;
using NLog.Common;

namespace NLog.Targets.Wrappers
{
interface IAsyncRequestQueue
{
bool IsEmpty { get; }
int RequestLimit { get; set; }
AsyncTargetWrapperOverflowAction OnOverflow { get; set; }

bool Enqueue(AsyncLogEventInfo logEventInfo);
AsyncLogEventInfo[] DequeueBatch(int count);
void DequeueBatch(int count, IList<AsyncLogEventInfo> result);
void Clear();
}
}

0 comments on commit 3ab6284

Please sign in to comment.