Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 298 additions & 0 deletions StackExchange.Redis/StackExchange/Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,12 @@ private static ConnectionMultiplexer ConnectImpl(Func<ConnectionMultiplexer> mul
}
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer.RawConfig.AbortOnConnectFail, muxer.failureMessage);
killMe = null;

if(muxer.ServerSelectionStrategy.ServerType == ServerType.Sentinel)
{
// Initialize the Sentinel handlers
muxer.InitializeSentinel(log);
}
return muxer;
}
finally
Expand Down Expand Up @@ -1884,6 +1890,298 @@ public bool IsConnected

internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy;

internal EndPoint currentSentinelMasterEndPoint = null;

internal System.Threading.Timer sentinelMasterReconnectTimer = null;

internal Dictionary<String, ConnectionMultiplexer> sentinelConnectionChildren = null;

/// <summary>
/// Initializes the connection as a Sentinel connection and adds
/// the necessary event handlers to track changes to the managed
/// masters.
/// </summary>
/// <param name="log"></param>
internal void InitializeSentinel(TextWriter log)
{
if(ServerSelectionStrategy.ServerType != ServerType.Sentinel)
return;

sentinelConnectionChildren = new Dictionary<string,ConnectionMultiplexer>();

// Subscribe to sentinel change events
ISubscriber sub = GetSubscriber();
if(sub.SubscribedEndpoint("+switch-master") == null)
{
sub.Subscribe("+switch-master", (channel, message) => {
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
EndPoint switchBlame = Format.TryParseEndPoint(string.Format("{0}:{1}", messageParts[1], messageParts[2]));

lock(sentinelConnectionChildren)
{
// Switch the master if we have connections for that service
if(sentinelConnectionChildren.ContainsKey(messageParts[0]))
{
ConnectionMultiplexer child = sentinelConnectionChildren[messageParts[0]];

// Is the connection still valid?
if(child.IsDisposed)
{
child.ConnectionFailed -= OnManagedConnectionFailed;
child.ConnectionRestored -= OnManagedConnectionRestored;
sentinelConnectionChildren.Remove(messageParts[0]);
}
else
{
SwitchMaster(switchBlame, sentinelConnectionChildren[messageParts[0]]);
}
}
}
});
}

// If we lose connection to a sentinel server,
// We need to reconfigure to make sure we still have
// a subscription to the +switch-master channel.
this.ConnectionFailed += (sender, e) => {
// Reconfigure to get subscriptions back online
ReconfigureAsync(false, true, log, e.EndPoint, "Lost sentinel connection", false).Wait();
};

// Subscribe to new sentinels being added
if(sub.SubscribedEndpoint("+sentinel") == null)
{
sub.Subscribe("+sentinel", (channel, message) => {
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
UpdateSentinelAddressList(messageParts[0]);
});
}
}

/// <summary>
/// Returns a managed connection to the master server indicated by
/// the ServiceName in the config.
/// </summary>
/// <param name="config">the configuration to be used when connecting to the master</param>
/// <param name="log"></param>
/// <returns></returns>
public ConnectionMultiplexer GetSentinelMasterConnection(ConfigurationOptions config, TextWriter log = null)
{
if(ServerSelectionStrategy.ServerType != ServerType.Sentinel)
throw new NotImplementedException("The ConnectionMultiplexer is not a Sentinel connection.");

if(String.IsNullOrEmpty(config.ServiceName))
throw new ArgumentException("A ServiceName must be specified.");

lock(sentinelConnectionChildren)
{
if(sentinelConnectionChildren.ContainsKey(config.ServiceName) && !sentinelConnectionChildren[config.ServiceName].IsDisposed)
return sentinelConnectionChildren[config.ServiceName];
}

// Clear out the endpoints
config.EndPoints.Clear();

// Get an initial endpoint
EndPoint initialMasterEndPoint = null;

do
{
initialMasterEndPoint = GetConfiguredMasterForService(config.ServiceName);
} while(initialMasterEndPoint == null);

config.EndPoints.Add(initialMasterEndPoint);

ConnectionMultiplexer connection = ConnectionMultiplexer.Connect(config, log);

// Attach to reconnect event to ensure proper connection to the new master
connection.ConnectionRestored += OnManagedConnectionRestored;

// If we lost the connection, run a switch to a least try and get updated info about the master
connection.ConnectionFailed += OnManagedConnectionFailed;

lock(sentinelConnectionChildren)
{
sentinelConnectionChildren[connection.RawConfig.ServiceName] = connection;
}

// Perform the initial switchover
SwitchMaster(configuration.EndPoints[0], connection, log);

return connection;
}

internal void OnManagedConnectionRestored(Object sender, ConnectionFailedEventArgs e)
{
ConnectionMultiplexer connection = (ConnectionMultiplexer)sender;

if(connection.sentinelMasterReconnectTimer != null)
{
connection.sentinelMasterReconnectTimer.Dispose();
connection.sentinelMasterReconnectTimer = null;
}

// Run a switch to make sure we have update-to-date
// information about which master we should connect to
SwitchMaster(e.EndPoint, connection);

try
{
// Verify that the reconnected endpoint is a master,
// and the correct one otherwise we should reconnect
if(connection.GetServer(e.EndPoint).IsSlave || e.EndPoint != connection.currentSentinelMasterEndPoint)
{
// Wait for things to smooth out
Thread.Sleep(200);

// This isn't a master, so try connecting again
SwitchMaster(e.EndPoint, connection);
}
}
catch(Exception)
{
// If we get here it means that we tried to reconnect to a server that is no longer
// considered a master by Sentinel and was removed from the list of endpoints.

// Wait for things to smooth out
Thread.Sleep(200);

// If we caught an exception, we may have gotten a stale endpoint
// we are not aware of, so retry
SwitchMaster(e.EndPoint, connection);
}
}

internal void OnManagedConnectionFailed(Object sender, ConnectionFailedEventArgs e)
{
ConnectionMultiplexer connection = (ConnectionMultiplexer)sender;
// Periodically check to see if we can reconnect to the proper master.
// This is here in case we lost our subscription to a good sentinel instance
// or if we miss the published master change
if(connection.sentinelMasterReconnectTimer == null)
{
connection.sentinelMasterReconnectTimer = new System.Threading.Timer((o) => {
SwitchMaster(e.EndPoint, connection);
}, null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1));

//connection.sentinelMasterReconnectTimer.AutoReset = true;

//connection.sentinelMasterReconnectTimer.Start();
}
}

internal EndPoint GetConfiguredMasterForService(String serviceName, int timeoutmillis = -1)
{
Task<EndPoint>[] sentinelMasters = this.serverSnapshot
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => this.GetServer(s.EndPoint).SentinelGetMasterAddressByNameAsync(serviceName))
.ToArray();

Task<Task<EndPoint>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinelMasters);
if (!firstCompleteRequest.Wait(timeoutmillis))
throw new TimeoutException("Timeout resolving master for service");
if (firstCompleteRequest.Result.Result == null)
throw new Exception("Unable to determine master");

return firstCompleteRequest.Result.Result;
}

private static async Task<Task<T>> WaitFirstNonNullIgnoreErrorsAsync<T>(Task<T>[] tasks)
{
if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return null;
var typeNullable = (Nullable.GetUnderlyingType(typeof(T)) != null);
var taskList = tasks.Cast<Task>().ToList();

try
{
while (taskList.Count() > 0)
{
#if NET40
var allTasksAwaitingAny = TaskEx.WhenAny(taskList).ObserveErrors();
#else
var allTasksAwaitingAny = Task.WhenAny(taskList).ObserveErrors();
#endif
var result = await allTasksAwaitingAny.ForAwait();
taskList.Remove((Task<T>)result);
if (((Task<T>)result).IsFaulted) continue;
if ((!typeNullable) || ((Task<T>)result).Result != null)
return (Task<T>)result;
}
}
catch
{ }

return null;
}

/// <summary>
/// Switches the SentinelMasterConnection over to a new master.
/// </summary>
/// <param name="switchBlame">the endpoing responsible for the switch</param>
/// <param name="connection">the connection that should be switched over to a new master endpoint</param>
/// <param name="log">log output</param>
internal void SwitchMaster(EndPoint switchBlame, ConnectionMultiplexer connection, TextWriter log = null)
{
if(log == null) log = TextWriter.Null;

String serviceName = connection.RawConfig.ServiceName;

// Get new master
EndPoint masterEndPoint = null;

do
{
masterEndPoint = GetConfiguredMasterForService(serviceName);
} while(masterEndPoint == null);

connection.currentSentinelMasterEndPoint = masterEndPoint;

if (!connection.servers.Contains(masterEndPoint))
{
connection.configuration.EndPoints.Clear();
connection.servers.Clear();
connection.serverSnapshot = new ServerEndPoint[0];
connection.configuration.EndPoints.Add(masterEndPoint);
Trace(string.Format("Switching master to {0}", masterEndPoint));
// Trigger a reconfigure
connection.ReconfigureAsync(false, false, log, switchBlame, string.Format("master switch {0}", serviceName), false, CommandFlags.PreferMaster).Wait();
}

UpdateSentinelAddressList(serviceName);
}

internal void UpdateSentinelAddressList(String serviceName, int timeoutmillis = 500)
{
Task<EndPoint[]>[] sentinels = this.serverSnapshot
.Where(s => s.ServerType == ServerType.Sentinel)
.Select(s => this.GetServer(s.EndPoint).SentinelGetSentinelAddresses(serviceName))
.ToArray();

Task<Task<EndPoint[]>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinels);

// Ignore errors, as having an updated sentinel list is
// not essential
if (firstCompleteRequest.Result?.Result == null)
return;
if (!firstCompleteRequest.Wait(timeoutmillis))
return;
if (firstCompleteRequest.Result.Result == null)
return;

bool hasNew = false;
foreach(EndPoint newSentinel in firstCompleteRequest.Result.Result.Where(x => !configuration.EndPoints.Contains(x)))
{
hasNew = true;
configuration.EndPoints.Add(newSentinel);
}

if(hasNew)
{
// Reconfigure the sentinel multiplexer if we added new endpoints
ReconfigureAsync(false, true, null, configuration.EndPoints[0], "Updating Sentinel List", false).Wait();
}
}

/// <summary>
/// Close all connections and release all resources associated with this object
Expand Down
9 changes: 9 additions & 0 deletions StackExchange.Redis/StackExchange/Redis/IServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ public partial interface IServer : IRedis
/// <remarks>http://redis.io/topics/sentinel</remarks>
Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Returns the ip and port numbers of all known Sentinels
/// for the given service name.
/// </summary>
/// <param name="serviveName">the sentinel service name</param>
/// <param name="flags"></param>
/// <returns>a list of the sentinel ips and ports</returns>
Task<EndPoint[]> SentinelGetSentinelAddresses(string serviveName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Show the state and info of the specified master.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions StackExchange.Redis/StackExchange/Redis/RedisLiterals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static readonly RedisValue
GETMASTERADDRBYNAME = "GET-MASTER-ADDR-BY-NAME",
// RESET = "RESET",
FAILOVER = "FAILOVER",
SENTINELS = "SENTINELS",

// Sentinel Literals as of 2.8.4
MONITOR = "MONITOR",
Expand Down
6 changes: 6 additions & 0 deletions StackExchange.Redis/StackExchange/Redis/RedisServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ public Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, Co
return ExecuteAsync(msg, ResultProcessor.SentinelMasterEndpoint);
}

public Task<EndPoint[]> SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SENTINELS, (RedisValue)serviceName);
return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints);
}

public KeyValuePair<string, string>[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTER, (RedisValue)serviceName);
Expand Down
Loading