Skip to content

Commit

Permalink
fix #1108 - introduce LogProxy as an intermediary between the TextWri…
Browse files Browse the repository at this point in the history
…ter; move the sync to there - allows safe detach from the logging (#1116)
  • Loading branch information
mgravell committed Apr 8, 2019
1 parent 93ee0fb commit bb98152
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 205 deletions.
9 changes: 5 additions & 4 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Expand Up @@ -9,6 +9,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;

namespace StackExchange.Redis
{
Expand Down Expand Up @@ -523,7 +524,7 @@ internal bool HasDnsEndPoints()
return false;
}

internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, TextWriter log)
internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, LogProxy log)
{
var cache = new Dictionary<string, IPAddress>(StringComparer.OrdinalIgnoreCase);
for (int i = 0; i < EndPoints.Count; i++)
Expand All @@ -542,12 +543,12 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex
}
else
{
multiplexer.LogLocked(log, "Using DNS to resolve '{0}'...", dns.Host);
log?.WriteLine($"Using DNS to resolve '{dns.Host}'...");
var ips = await Dns.GetHostAddressesAsync(dns.Host).ObserveErrors().ForAwait();
if (ips.Length == 1)
{
ip = ips[0];
multiplexer.LogLocked(log, "'{0}' => {1}", dns.Host, ip);
log?.WriteLine($"'{dns.Host}' => {ip}");
cache[dns.Host] = ip;
EndPoints[i] = new IPEndPoint(ip, dns.Port);
}
Expand All @@ -556,7 +557,7 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex
catch (Exception ex)
{
multiplexer.OnInternalError(ex);
multiplexer.LogLocked(log, ex.Message);
log?.WriteLine(ex.Message);
}
}
}
Expand Down
363 changes: 195 additions & 168 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/StackExchange.Redis/DebuggingAids.cs
Expand Up @@ -20,7 +20,7 @@ partial class ConnectionMultiplexer
Debug.WriteLine(message, Environment.CurrentManagedThreadId + " ~ " + category);
}

partial void OnTraceLog(TextWriter log, string caller)
partial void OnTraceLog(LogProxy log, string caller)
{
lock (UniqueId)
{
Expand Down
11 changes: 6 additions & 5 deletions src/StackExchange.Redis/Message.cs
Expand Up @@ -8,20 +8,21 @@
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
using static StackExchange.Redis.ConnectionMultiplexer;

namespace StackExchange.Redis
{
internal sealed class LoggingMessage : Message
{
public readonly TextWriter log;
public readonly LogProxy log;
private readonly Message tail;

public static Message Create(TextWriter log, Message tail)
public static Message Create(LogProxy log, Message tail)
{
return log == null ? tail : new LoggingMessage(log, tail);
}

private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
private LoggingMessage(LogProxy log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{
this.log = log;
this.tail = tail;
Expand All @@ -39,14 +40,14 @@ protected override void WriteImpl(PhysicalConnection physical)
try
{
var bridge = physical.BridgeCouldBeNull;
bridge?.Multiplexer?.LogLocked(log, "Writing to {0}: {1}", bridge, tail.CommandAndKey);
log?.WriteLine($"Writing to {bridge}: {tail.CommandAndKey}");
}
catch { }
tail.WriteTo(physical);
}
public override int ArgCount => tail.ArgCount;

public TextWriter Log => log;
public LogProxy Log => log;
}

internal abstract class Message : ICompletable
Expand Down
11 changes: 6 additions & 5 deletions src/StackExchange.Redis/PhysicalBridge.cs
Expand Up @@ -10,6 +10,7 @@
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;

namespace StackExchange.Redis
Expand Down Expand Up @@ -129,7 +130,7 @@ public void ReportNextFailure()

public override string ToString() => ConnectionType + "/" + Format.ToString(ServerEndPoint.EndPoint);

public void TryConnect(TextWriter log) => GetConnection(log);
public void TryConnect(LogProxy log) => GetConnection(log);

private WriteResult QueueOrFailMessage(Message message)
{
Expand Down Expand Up @@ -380,7 +381,7 @@ internal void KeepAlive()
}
}

internal async Task OnConnectedAsync(PhysicalConnection connection, TextWriter log)
internal async Task OnConnectedAsync(PhysicalConnection connection, LogProxy log)
{
Trace("OnConnected");
if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing))
Expand Down Expand Up @@ -1097,15 +1098,15 @@ private bool ChangeState(State oldState, State newState)
return result;
}

private PhysicalConnection GetConnection(TextWriter log)
private PhysicalConnection GetConnection(LogProxy log)
{
if (state == (int)State.Disconnected)
{
try
{
if (!Multiplexer.IsDisposed)
{
Multiplexer.LogLocked(log, "Connecting {0}...", Name);
log?.WriteLine($"Connecting {Name}...");
Multiplexer.Trace("Connecting...", Name);
if (ChangeState(State.Disconnected, State.Connecting))
{
Expand All @@ -1122,7 +1123,7 @@ private PhysicalConnection GetConnection(TextWriter log)
}
catch (Exception ex)
{
Multiplexer.LogLocked(log, "Connect {0} failed: {1}", Name, ex.Message);
log?.WriteLine($"Connect {Name} failed: {ex.Message}");
Multiplexer.Trace("Connect failed: " + ex.Message, Name);
ChangeState(State.Disconnected);
OnInternalError(ex);
Expand Down
17 changes: 9 additions & 8 deletions src/StackExchange.Redis/PhysicalConnection.cs
Expand Up @@ -18,6 +18,7 @@
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Arenas;
using static StackExchange.Redis.ConnectionMultiplexer;

namespace StackExchange.Redis
{
Expand Down Expand Up @@ -85,7 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge)
OnCreateEcho();
}

internal async Task BeginConnectAsync(TextWriter log)
internal async Task BeginConnectAsync(LogProxy log)
{
var bridge = BridgeCouldBeNull;
var endpoint = bridge?.ServerEndPoint?.EndPoint;
Expand All @@ -97,7 +98,7 @@ internal async Task BeginConnectAsync(TextWriter log)
Trace("Connecting...");
_socket = SocketManager.CreateSocket(endpoint);
bridge.Multiplexer.OnConnecting(endpoint, bridge.ConnectionType);
bridge.Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
log?.WriteLine($"BeginConnect: {Format.ToString(endpoint)}");

CancellationTokenSource timeoutSource = null;
try
Expand Down Expand Up @@ -141,7 +142,7 @@ internal async Task BeginConnectAsync(TextWriter log)
}
else if (await ConnectedAsync(x, log, bridge.Multiplexer.SocketManager).ForAwait())
{
bridge.Multiplexer.LogLocked(log, "Starting read");
log?.WriteLine("Starting read");
try
{
StartReading();
Expand All @@ -161,7 +162,7 @@ internal async Task BeginConnectAsync(TextWriter log)
}
catch (ObjectDisposedException)
{
bridge.Multiplexer.LogLocked(log, "(socket shutdown)");
log?.WriteLine("(socket shutdown)");
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{
Expand Down Expand Up @@ -1251,7 +1252,7 @@ private static LocalCertificateSelectionCallback GetAmbientClientCertificateCall
return null;
}

internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager)
internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy log, SocketManager manager)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) return false;
Expand All @@ -1270,7 +1271,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc

if (config.Ssl)
{
bridge.Multiplexer.LogLocked(log, "Configuring SSL");
log?.WriteLine("Configuring TLS");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);

Expand All @@ -1290,7 +1291,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
bridge.Multiplexer?.SetAuthSuspect();
throw;
}
bridge.Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
log?.WriteLine($"TLS connection established successfully using protocol: {ssl.SslProtocol}");
}
catch (AuthenticationException authexception)
{
Expand All @@ -1308,7 +1309,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc

_ioPipe = pipe;

bridge.Multiplexer.LogLocked(log, "Connected {0}", bridge);
log?.WriteLine($"Connected {bridge}");

await bridge.OnConnectedAsync(this, log).ForAwait();
return true;
Expand Down
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/RedisServer.cs
Expand Up @@ -6,6 +6,7 @@
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;

#pragma warning disable RCS1231 // Make parameter ref read-only.

Expand Down Expand Up @@ -320,7 +321,10 @@ public Task<DateTime> LastSaveAsync(CommandFlags flags = CommandFlags.None)

public void MakeMaster(ReplicationChangeOptions options, TextWriter log = null)
{
multiplexer.MakeMaster(server, options, log);
using (var proxy = LogProxy.TryCreate(log))
{
multiplexer.MakeMaster(server, options, proxy);
}
}

public void Save(SaveType type, CommandFlags flags = CommandFlags.None)
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/ResultProcessor.cs
Expand Up @@ -175,7 +175,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
try
{
bridge?.Multiplexer?.LogLocked(logging.Log, "Response from {0} / {1}: {2}", bridge, message.CommandAndKey, result);
logging.Log?.WriteLine($"Response from {bridge} / {message.CommandAndKey}: {result}");
}
catch { }
}
Expand Down
25 changes: 13 additions & 12 deletions src/StackExchange.Redis/ServerEndPoint.cs
Expand Up @@ -9,6 +9,7 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using static StackExchange.Redis.PhysicalBridge;

namespace StackExchange.Redis
Expand Down Expand Up @@ -155,7 +156,7 @@ public void Dispose()
tmp?.Dispose();
}

public PhysicalBridge GetBridge(ConnectionType type, bool create = true, TextWriter log = null)
public PhysicalBridge GetBridge(ConnectionType type, bool create = true, LogProxy log = null)
{
if (isDisposed) return null;
switch (type)
Expand Down Expand Up @@ -237,7 +238,7 @@ public void SetUnselectable(UnselectableFlags flags)

public ValueTask<WriteResult> TryWriteAsync(Message message) => GetBridge(message.Command)?.TryWriteAsync(message, isSlave) ?? new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);

internal void Activate(ConnectionType type, TextWriter log)
internal void Activate(ConnectionType type, LogProxy log)
{
GetBridge(type, true, log);
}
Expand Down Expand Up @@ -467,7 +468,7 @@ internal bool IsSelectable(RedisCommand command, bool allowDisconnected = false)
return bridge != null && (allowDisconnected || bridge.IsConnected);
}

internal Task OnEstablishingAsync(PhysicalConnection connection, TextWriter log)
internal Task OnEstablishingAsync(PhysicalConnection connection, LogProxy log)
{
try
{
Expand Down Expand Up @@ -624,7 +625,7 @@ internal void ReportNextFailure()
subscription?.ReportNextFailure();
}

internal Task<bool> SendTracer(TextWriter log = null)
internal Task<bool> SendTracer(LogProxy log = null)
{
var msg = GetTracerMessage(false);
msg = LoggingMessage.Create(log, msg);
Expand Down Expand Up @@ -727,7 +728,7 @@ internal void WriteDirectOrQueueFireAndForgetSync<T>(PhysicalConnection connecti
}
}

private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
private PhysicalBridge CreateBridge(ConnectionType type, LogProxy log)
{
if (Multiplexer.IsDisposed) return null;
Multiplexer.Trace(type.ToString());
Expand All @@ -736,9 +737,9 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
return bridge;
}

private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
private async Task HandshakeAsync(PhysicalConnection connection, LogProxy log)
{
Multiplexer.LogLocked(log, "Server handshake");
log?.WriteLine("Server handshake");
if (connection == null)
{
Multiplexer.Trace("No connection!?");
Expand All @@ -748,7 +749,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
string password = Multiplexer.RawConfig.Password;
if (!string.IsNullOrWhiteSpace(password))
{
Multiplexer.LogLocked(log, "Authenticating (password)");
log?.WriteLine("Authenticating (password)");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password);
msg.SetInternalCall();
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
Expand All @@ -762,7 +763,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
name = nameSanitizer.Replace(name, "");
if (!string.IsNullOrWhiteSpace(name))
{
Multiplexer.LogLocked(log, "Setting client name: {0}", name);
log?.WriteLine($"Setting client name: {name}");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name);
msg.SetInternalCall();
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
Expand All @@ -779,10 +780,10 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)

if (connType == ConnectionType.Interactive)
{
Multiplexer.LogLocked(log, "Auto-configure...");
log?.WriteLine("Auto-configure...");
AutoConfigure(connection);
}
Multiplexer.LogLocked(log, "Sending critical tracer: {0}", bridge);
log?.WriteLine($"Sending critical tracer: {bridge}");
var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer);
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
Expand All @@ -798,7 +799,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.TrackSubscriptions).ForAwait();
}
}
Multiplexer.LogLocked(log, "Flushing outbound buffer");
log?.WriteLine("Flushing outbound buffer");
await connection.FlushAsync().ForAwait();
}

Expand Down

0 comments on commit bb98152

Please sign in to comment.