Skip to content
This repository has been archived by the owner on Dec 13, 2018. It is now read-only.

Commit

Permalink
Use BlockingCollection<T>
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Feb 17, 2017
1 parent c321853 commit db73396
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 578 deletions.
21 changes: 17 additions & 4 deletions src/Microsoft.Extensions.Logging.Console/ConsoleLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ConsoleLogger : ILogger
// ConsoleColor does not have a value to specify the 'Default' color
private readonly ConsoleColor? DefaultConsoleColor = null;

private readonly ConsoleLoggerProcessor _queueProcessor = new ConsoleLoggerProcessor();
private readonly ConsoleLoggerProcessor _queueProcessor;
private Func<string, LogLevel, bool> _filter;

[ThreadStatic]
Expand All @@ -31,6 +31,11 @@ static ConsoleLogger()
}

public ConsoleLogger(string name, Func<string, LogLevel, bool> filter, bool includeScopes)
: this(name, filter, includeScopes, new ConsoleLoggerProcessor())
{
}

internal ConsoleLogger(string name, Func<string, LogLevel, bool> filter, bool includeScopes, ConsoleLoggerProcessor loggerProcessor)
{
if (name == null)
{
Expand All @@ -41,6 +46,8 @@ public ConsoleLogger(string name, Func<string, LogLevel, bool> filter, bool incl
Filter = filter ?? ((category, logLevel) => true);
IncludeScopes = includeScopes;

_queueProcessor = loggerProcessor;

if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
Console = new WindowsLogConsole();
Expand All @@ -54,7 +61,15 @@ public ConsoleLogger(string name, Func<string, LogLevel, bool> filter, bool incl
public IConsole Console
{
get { return _queueProcessor.Console; }
set { _queueProcessor.Console = value; }
set
{
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}

_queueProcessor.Console = value;
}
}

public Func<string, LogLevel, bool> Filter
Expand All @@ -75,8 +90,6 @@ public Func<string, LogLevel, bool> Filter

public string Name { get; }

public bool HasQueuedMessages => _queueProcessor.HasQueuedMessages;

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
if (!IsEnabled(logLevel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Extensions.Logging.Console.Internal;

namespace Microsoft.Extensions.Logging.Console
{
Expand All @@ -13,6 +14,7 @@ public class ConsoleLoggerProvider : ILoggerProvider

private readonly Func<string, LogLevel, bool> _filter;
private IConsoleLoggerSettings _settings;
private readonly ConsoleLoggerProcessor _messageQueue = new ConsoleLoggerProcessor();

public ConsoleLoggerProvider(Func<string, LogLevel, bool> filter, bool includeScopes)
{
Expand Down Expand Up @@ -69,7 +71,7 @@ public ILogger CreateLogger(string name)

private ConsoleLogger CreateLoggerImplementation(string name)
{
return new ConsoleLogger(name, GetFilter(name, _settings), _settings.IncludeScopes);
return new ConsoleLogger(name, GetFilter(name, _settings), _settings.IncludeScopes, _messageQueue);
}

private Func<string, LogLevel, bool> GetFilter(string name, IConsoleLoggerSettings settings)
Expand Down Expand Up @@ -111,6 +113,7 @@ private IEnumerable<string> GetKeyPrefixes(string name)

public void Dispose()
{
_messageQueue.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,158 +3,50 @@

using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.Logging.Console.Internal
{
public class ConsoleLoggerProcessor
public class ConsoleLoggerProcessor : IDisposable
{
private const int _maxQueuedMessages = 1024;

private IConsole _console;

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private readonly ConcurrentQueue<LogMessageEntry> _messageQueue = new ConcurrentQueue<LogMessageEntry>();
private readonly BlockingCollection<LogMessageEntry> _messageQueue = new BlockingCollection<LogMessageEntry>(_maxQueuedMessages);
private readonly Task _outputTask;

private readonly ManualResetEventSlim _backpressure = new ManualResetEventSlim(true);
private readonly object _countLock = new object();

private int _queuedMessageCount;
private bool _isShuttingDown = false;
public IConsole Console;

public ConsoleLoggerProcessor()
{
RegisterForExit();

// Start Console message queue processor
_outputTask = Task.Factory.StartNew(
ProcessLogQueue,
this,
TaskCreationOptions.LongRunning);
}

public IConsole Console
{
get { return _console; }
set
{
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}

_console = value;
}
}

public bool HasQueuedMessages => !_messageQueue.IsEmpty;

public void EnqueueMessage(LogMessageEntry message)
{
ApplyBackpressure();

_messageQueue.Enqueue(message);

WakeupProcessor();
}

private void ProcessLogQueue()
{
bool isShuttingDown;
do
{
isShuttingDown = WaitForNewMessages();

OutputQueuedMessages();

} while (!isShuttingDown);
}

private bool WaitForNewMessages()
public virtual void EnqueueMessage(LogMessageEntry message)
{
if (_messageQueue.IsEmpty && !Volatile.Read(ref _isShuttingDown))
{
// No messages; wait for new messages
_semaphore.Wait();
}

return Volatile.Read(ref _isShuttingDown);
}

private void OutputQueuedMessages()
{
var messagesOutput = 0;
LogMessageEntry message;
while (_messageQueue.TryDequeue(out message))
{
if (message.LevelString != null)
{
Console.Write(message.LevelString, message.LevelBackground, message.LevelForeground);
}

Console.Write(message.Message, message.MessageColor, message.MessageColor);
messagesOutput++;
}

if (messagesOutput > 0)
{
// In case of AnsiLogConsole, the messages are not yet written to the console, flush them
Console.Flush();

ReleaseBackpressure(messagesOutput);
}
_messageQueue.Add(message);
}

private void WakeupProcessor()
// for testing
internal virtual void WriteMessage(LogMessageEntry message)
{
if (_semaphore.CurrentCount == 0)
if (message.LevelString != null)
{
// Console output Task may be asleep, wake it up
_semaphore.Release();
Console.Write(message.LevelString, message.LevelBackground, message.LevelForeground);
}
}

private void ApplyBackpressure()
{
do
{
// Check if back pressure applied
_backpressure.Wait();

lock (_countLock)
{
var messageCount = _queuedMessageCount + 1;
if (messageCount <= _maxQueuedMessages)
{
_queuedMessageCount = messageCount;
if (messageCount == _maxQueuedMessages)
{
// Next message would put the queue over max, set blocking
_backpressure.Reset();
}

// Exit and queue message
break;
}
}

} while (true);
Console.Write(message.Message, message.MessageColor, message.MessageColor);
Console.Flush();
}

private void ReleaseBackpressure(int messagesOutput)
private void ProcessLogQueue()
{
lock (_countLock)
foreach (var message in _messageQueue.GetConsumingEnumerable())
{
if (_queuedMessageCount >= _maxQueuedMessages &&
_queuedMessageCount - messagesOutput < _maxQueuedMessages)
{
// Was blocked, unblock
_backpressure.Set();
}
_queuedMessageCount -= messagesOutput;
WriteMessage(message);
}
}

Expand All @@ -165,27 +57,10 @@ private static void ProcessLogQueue(object state)
consoleLogger.ProcessLogQueue();
}

private void RegisterForExit()
public void Dispose()
{
// Hooks to detect Process exit, and allow the Console to complete output
#if NET451
AppDomain.CurrentDomain.ProcessExit += InitiateShutdown;
#elif NETSTANDARD1_5
var currentAssembly = typeof(ConsoleLogger).GetTypeInfo().Assembly;
System.Runtime.Loader.AssemblyLoadContext.GetLoadContext(currentAssembly).Unloading += InitiateShutdown;
#endif
}
_messageQueue.CompleteAdding();

#if NET451
private void InitiateShutdown(object sender, EventArgs e)
#elif NETSTANDARD1_5
private void InitiateShutdown(System.Runtime.Loader.AssemblyLoadContext obj)
#else
private void InitiateShutdown()
#endif
{
_isShuttingDown = true;
_semaphore.Release(); // Fast wake up vs cts
try
{
_outputTask.Wait(1500); // with timeout in-case Console is locked by user input
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Runtime.CompilerServices;


[assembly: InternalsVisibleTo("Microsoft.Extensions.Logging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]
43 changes: 0 additions & 43 deletions src/Microsoft.Extensions.Logging.Console/project.json

This file was deleted.

2 changes: 0 additions & 2 deletions test/Microsoft.Extensions.Logging.Test/Console/TestConsole.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ public void WriteLine(string message, ConsoleColor? background, ConsoleColor? fo
{
Write(message + Environment.NewLine, background, foreground);
}
public Action OnFlush { get; set; }

public void Flush()
{
OnFlush?.Invoke();
}

private void ResetColor()
Expand Down
Loading

0 comments on commit db73396

Please sign in to comment.