Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Adds `GET` on `SET` command support (present in Redis 6.2+ - #2003 via martinekvili)
- Improves concurrent load performance when backlogs are utilized (#2008 via NickCraver)
- Improves cluster connections when `CLUSTER` command is disabled (#2014 via tylerohlsen)
- Improves connection logging and adds overall timing to it (#2019 via NickCraver)

## 2.5.27 (prerelease)

Expand Down
92 changes: 56 additions & 36 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0) return true;
var watch = Stopwatch.StartNew();
var watch = ValueStopwatch.StartNew();
try
{
// If no error, great
Expand All @@ -683,7 +683,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
var task = tasks[i];
if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted)
{
var remaining = timeout - checked((int)watch.ElapsedMilliseconds);
var remaining = timeout - watch.ElapsedMilliseconds;
if (remaining <= 0) return false;
try
{
Expand Down Expand Up @@ -742,12 +742,12 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(string name, Task[] tas
return true;
}

var watch = Stopwatch.StartNew();
var watch = ValueStopwatch.StartNew();
LogWithThreadPoolStats(log, $"Awaiting {tasks.Length} {name} task completion(s) for {timeoutMilliseconds}ms", out _);
try
{
// if none error, great
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
var remaining = timeoutMilliseconds - watch.ElapsedMilliseconds;
if (remaining <= 0)
{
LogWithThreadPoolStats(log, "Timeout before awaiting for tasks", out _);
Expand All @@ -769,7 +769,7 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(string name, Task[] tas
var task = tasks[i];
if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted)
{
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
var remaining = timeoutMilliseconds - watch.ElapsedMilliseconds;
if (remaining <= 0)
{
LogWithThreadPoolStats(log, "Timeout awaiting tasks", out _);
Expand Down Expand Up @@ -891,7 +891,8 @@ private static async Task<ConnectionMultiplexer> ConnectImplAsync(ConfigurationO
{
try
{
log?.WriteLine($"Connecting (async) on {RuntimeInformation.FrameworkDescription}");
var sw = ValueStopwatch.StartNew();
logProxy?.WriteLine($"Connecting (async) on {RuntimeInformation.FrameworkDescription} (StackExchange.Redis: v{Utils.GetLibVersion()})");

muxer = CreateMultiplexer(configuration, logProxy, out connectHandler);
killMe = muxer;
Expand All @@ -912,6 +913,8 @@ private static async Task<ConnectionMultiplexer> ConnectImplAsync(ConfigurationO

await Maintenance.ServerMaintenanceEvent.AddListenersAsync(muxer, logProxy).ForAwait();

logProxy?.WriteLine($"Total connect time: {sw.ElapsedMilliseconds:n0} ms");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in knowing how long it took to fail to connect? If so, you could move these "Total connect time" lines into the finally

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated that as well - I feel like that might be confusing to give a total time when it just blew sky high for no reason - in those cases we care more about the log of what went sky high or what was the last thing logged, rather than the time...I think.


return muxer;
}
finally
Expand Down Expand Up @@ -998,6 +1001,16 @@ public void WriteLine(string message = null)
}
}
}
public void WriteLine(string prefix, string message)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider an int indent instead of string prefix?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but overall it'd be doing a string creation and allocation that way so kept it simple :)

{
if (_log != null) // note: double-checked
{
lock (SyncLock)
{
_log?.WriteLine($"{DateTime.UtcNow:HH:mm:ss.ffff}: {prefix}{message}");
}
}
}
public void Dispose()
{
if (_log != null) // note: double-checked
Expand All @@ -1020,7 +1033,7 @@ private static ConnectionMultiplexer CreateMultiplexer(ConfigurationOptions conf
lock (log.SyncLock) // Keep the outer and any inner errors contiguous
{
var ex = a.Exception;
log?.WriteLine($"connection failed: {Format.ToString(a.EndPoint)} ({a.ConnectionType}, {a.FailureType}): {ex?.Message ?? "(unknown)"}");
log?.WriteLine($"Connection failed: {Format.ToString(a.EndPoint)} ({a.ConnectionType}, {a.FailureType}): {ex?.Message ?? "(unknown)"}");
while ((ex = ex.InnerException) != null)
{
log?.WriteLine($"> {ex.Message}");
Expand Down Expand Up @@ -1178,7 +1191,8 @@ private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configurat
{
try
{
log?.WriteLine($"Connecting (sync) on {RuntimeInformation.FrameworkDescription}");
var sw = ValueStopwatch.StartNew();
logProxy?.WriteLine($"Connecting (sync) on {RuntimeInformation.FrameworkDescription} (StackExchange.Redis: v{Utils.GetLibVersion()})");

muxer = CreateMultiplexer(configuration, logProxy, out connectHandler);
killMe = muxer;
Expand Down Expand Up @@ -1211,6 +1225,8 @@ private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configurat

Maintenance.ServerMaintenanceEvent.AddListenersAsync(muxer, logProxy).Wait(muxer.SyncConnectTimeout(true));

logProxy?.WriteLine($"Total connect time: {sw.ElapsedMilliseconds:n0} ms");

return muxer;
}
finally
Expand Down Expand Up @@ -1640,11 +1656,12 @@ internal void GetStatus(LogProxy log)
if (log == null) return;

var tmp = GetServerSnapshot();
log?.WriteLine("Endpoint Summary:");
foreach (var server in tmp)
{
log?.WriteLine(server.Summary());
log?.WriteLine(server.GetCounters().ToString());
log?.WriteLine(server.GetProfile());
log?.WriteLine(prefix: " ", message: server.Summary());
log?.WriteLine(prefix: " ", message: server.GetCounters().ToString());
log?.WriteLine(prefix: " ", message: server.GetProfile());
}
log?.WriteLine($"Sync timeouts: {Interlocked.Read(ref syncTimeouts)}; async timeouts: {Interlocked.Read(ref asyncTimeouts)}; fire and forget: {Interlocked.Read(ref fireAndForgets)}; last heartbeat: {LastHeartbeatSecondsAgo}s ago");
}
Expand Down Expand Up @@ -1722,7 +1739,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP

ServerEndPoint[] servers = null;
bool encounteredConnectedClusterServer = false;
Stopwatch watch = null;
ValueStopwatch? watch = null;

int iterCount = first ? 2 : 1;
// This is fix for https://github.com/StackExchange/StackExchange.Redis/issues/300
Expand Down Expand Up @@ -1753,8 +1770,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
available[i] = server.OnConnectedAsync(log, sendTracerIfConnected: true, autoConfigureIfConnected: reconfigureAll);
}

watch ??= Stopwatch.StartNew();
var remaining = RawConfig.ConnectTimeout - checked((int)watch.ElapsedMilliseconds);
watch ??= ValueStopwatch.StartNew();
var remaining = RawConfig.ConnectTimeout - watch.Value.ElapsedMilliseconds;
log?.WriteLine($"Allowing {available.Length} endpoint(s) {TimeSpan.FromMilliseconds(remaining)} to respond...");
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(remaining) + " to respond...");
var allConnected = await WaitAllIgnoreErrorsAsync("available", available, remaining, log).ForAwait();
Expand All @@ -1772,10 +1789,11 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
}

log?.WriteLine($"Endpoint summary:");
// Log current state after await
foreach (var server in servers)
{
log?.WriteLine($"{Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}");
log?.WriteLine($" {Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}");
}

EndPointCollection updatedClusterEndpointCollection = null;
Expand All @@ -1790,21 +1808,21 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
log?.WriteLine($"{Format.ToString(server)}: Faulted: {ex.Message}");
log?.WriteLine($" {Format.ToString(server)}: Faulted: {ex.Message}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these Format.ToString() calls necessary? It looks like ServerEndPoint already overrides ToString to do that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I would like to change these everywhere but going consistent for this PR and want to do those all at once :)

failureMessage = ex.Message;
}
}
else if (task.IsCanceled)
{
server.SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(server)}: Connect task canceled");
log?.WriteLine($" {Format.ToString(server)}: Connect task canceled");
}
else if (task.IsCompleted)
{
if (task.Result != "Disconnected")
{
server.ClearUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(server)}: Returned with success as {server.ServerType} {(server.IsReplica ? "replica" : "primary")} (Source: {task.Result})");
log?.WriteLine($" {Format.ToString(server)}: Returned with success as {server.ServerType} {(server.IsReplica ? "replica" : "primary")} (Source: {task.Result})");

// Count the server types
switch (server.ServerType)
Expand Down Expand Up @@ -1857,13 +1875,13 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
else
{
server.SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(server)}: Returned, but incorrectly");
log?.WriteLine($" {Format.ToString(server)}: Returned, but incorrectly");
}
}
else
{
server.SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(server)}: Did not respond");
log?.WriteLine($" {Format.ToString(server)}: Did not respond");
}
}

Expand Down Expand Up @@ -1951,9 +1969,9 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
healthy = standaloneCount != 0 || clusterCount != 0 || sentinelCount != 0;
if (first && !healthy && attemptsLeft > 0)
{
log?.WriteLine("resetting failing connections to retry...");
log?.WriteLine("Resetting failing connections to retry...");
ResetAllNonConnected();
log?.WriteLine($"retrying; attempts left: {attemptsLeft}...");
log?.WriteLine($" Retrying - attempts left: {attemptsLeft}...");
}
//WTF("?: " + attempts);
} while (first && !healthy && attemptsLeft > 0);
Expand Down Expand Up @@ -2035,6 +2053,8 @@ private void ResetAllNonConnected()

private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, List<ServerEndPoint> masters)
{
log?.WriteLine("Election summary:");

Dictionary<string, int> uniques = null;
if (useTieBreakers)
{
Expand All @@ -2047,11 +2067,11 @@ private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoi

if (string.IsNullOrWhiteSpace(serverResult))
{
log?.WriteLine($"Election: {Format.ToString(server)} had no tiebreaker set");
log?.WriteLine($" Election: {Format.ToString(server)} had no tiebreaker set");
}
else
{
log?.WriteLine($"Election: {Format.ToString(server)} nominates: {serverResult}");
log?.WriteLine($" Election: {Format.ToString(server)} nominates: {serverResult}");
if (!uniques.TryGetValue(serverResult, out int count)) count = 0;
uniques[serverResult] = count + 1;
}
Expand All @@ -2061,37 +2081,37 @@ private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoi
switch (masters.Count)
{
case 0:
log?.WriteLine("Election: No masters detected");
log?.WriteLine(" Election: No masters detected");
return null;
case 1:
log?.WriteLine($"Election: Single master detected: {Format.ToString(masters[0].EndPoint)}");
log?.WriteLine($" Election: Single master detected: {Format.ToString(masters[0].EndPoint)}");
return masters[0];
default:
log?.WriteLine("Election: Multiple masters detected...");
log?.WriteLine(" Election: Multiple masters detected...");
if (useTieBreakers && uniques != null)
{
switch (uniques.Count)
{
case 0:
log?.WriteLine("Election: No nominations by tie-breaker");
log?.WriteLine(" Election: No nominations by tie-breaker");
break;
case 1:
string unanimous = uniques.Keys.Single();
log?.WriteLine($"Election: Tie-breaker unanimous: {unanimous}");
log?.WriteLine($" Election: Tie-breaker unanimous: {unanimous}");
var found = SelectServerByElection(servers, unanimous, log);
if (found != null)
{
log?.WriteLine($"Election: Elected: {Format.ToString(found.EndPoint)}");
log?.WriteLine($" Election: Elected: {Format.ToString(found.EndPoint)}");
return found;
}
break;
default:
log?.WriteLine("Election is contested:");
log?.WriteLine(" Election is contested:");
ServerEndPoint highest = null;
bool arbitrary = false;
foreach (var pair in uniques.OrderByDescending(x => x.Value))
{
log?.WriteLine($"Election: {pair.Key} has {pair.Value} votes");
log?.WriteLine($" Election: {pair.Key} has {pair.Value} votes");
if (highest == null)
{
highest = SelectServerByElection(servers, pair.Key, log);
Expand All @@ -2106,11 +2126,11 @@ private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoi
{
if (arbitrary)
{
log?.WriteLine($"Election: Choosing master arbitrarily: {Format.ToString(highest.EndPoint)}");
log?.WriteLine($" Election: Choosing master arbitrarily: {Format.ToString(highest.EndPoint)}");
}
else
{
log?.WriteLine($"Election: Elected: {Format.ToString(highest.EndPoint)}");
log?.WriteLine($" Election: Elected: {Format.ToString(highest.EndPoint)}");
}
return highest;
}
Expand All @@ -2120,7 +2140,7 @@ private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoi
break;
}

log?.WriteLine($"Election: Choosing master arbitrarily: {Format.ToString(masters[0].EndPoint)}");
log?.WriteLine($" Election: Choosing master arbitrarily: {Format.ToString(masters[0].EndPoint)}");
return masters[0];
}

Expand Down Expand Up @@ -2413,7 +2433,7 @@ public ConnectionMultiplexer GetSentinelMasterConnection(ConfigurationOptions co
bool success = false;
ConnectionMultiplexer connection = null;

var sw = Stopwatch.StartNew();
var sw = ValueStopwatch.StartNew();
do
{
// Get an initial endpoint - try twice
Expand Down
32 changes: 32 additions & 0 deletions src/StackExchange.Redis/ValueStopwatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Diagnostics;

namespace StackExchange.Redis;

/// <summary>
/// Optimization over <see cref="Stopwatch"/>, from https://github.com/dotnet/aspnetcore/blob/main/src/Shared/ValueStopwatch/ValueStopwatch.cs
/// </summary>
internal struct ValueStopwatch
{
private static readonly double TimestampToTicks = TimeSpan.TicksPerSecond / (double)Stopwatch.Frequency;
private readonly long _startTimestamp;
public bool IsActive => _startTimestamp != 0;

private ValueStopwatch(long startTimestamp) => _startTimestamp = startTimestamp;
public static ValueStopwatch StartNew() => new ValueStopwatch(Stopwatch.GetTimestamp());

public int ElapsedMilliseconds => checked((int)GetElapsedTime().TotalMilliseconds);

public TimeSpan GetElapsedTime()
{
if (!IsActive)
{
throw new InvalidOperationException("An uninitialized, or 'default', ValueStopwatch cannot be used to get elapsed time.");
}

var end = Stopwatch.GetTimestamp();
var timestampDelta = end - _startTimestamp;
var ticks = (long)(TimestampToTicks * timestampDelta);
return new TimeSpan(ticks);
}
}