Permalink
Browse files

Updates to support reclaiming a server if an exception happens that d…

…eems that server is no longer healthy. Also added support

Max Retries which is how many times an operation will try to execute if an exception happens and that exception is deemed ShouldRetry=true
  • Loading branch information...
1 parent 4423a23 commit b8ed8e16c9b33e293d04cc177d41a90ad11654ef @eplowe eplowe committed Nov 15, 2012
View
@@ -1,6 +1,6 @@

Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 2012
+# Visual Studio 2010
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FluentCassandra", "src\FluentCassandra.csproj", "{EAA32600-3C2A-4B34-B9B2-5764F280FCE3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FluentCassandra.Sandbox", "test\FluentCassandra.Sandbox\FluentCassandra.Sandbox.csproj", "{DE6B46DE-C37A-49AF-8B9A-B9B6D4F03A55}"
@@ -4,10 +4,27 @@ namespace FluentCassandra
{
public class CassandraException : Exception
{
+ public bool IsClientHealthy { get; set; }
+ public bool ShouldRetry { get; set; }
+
public CassandraException(string message)
: base(message) { }
public CassandraException(string message, Exception innerException)
: base(message, innerException) { }
+
+ public CassandraException(string message, bool isHealthy, bool shouldRetry)
+ : base(message)
+ {
+ IsClientHealthy = isHealthy;
+ ShouldRetry = shouldRetry;
+ }
+
+ public CassandraException(string message, Exception innerException, bool isHealthy, bool shouldRetry)
+ : base(message, innerException)
+ {
+ IsClientHealthy = isHealthy;
+ ShouldRetry = shouldRetry;
+ }
}
}
@@ -182,13 +182,46 @@ public TResult ExecuteOperation<TResult>(Operation<TResult> action, bool? throwO
action.Session = this;
TResult result;
- bool success = action.TryExecute(out result);
- if (!success)
- LastError = action.Error;
+ int counter = 0;
+ bool noException = false;
- if (!success && (throwOnError ?? ThrowErrors))
- throw action.Error;
+ do
+ {
+ noException = false;
+ bool success = action.TryExecute(out result);
+
+ if (success)
+ {
+ noException = true;
+ LastError = null;
+ }
+ else
+ {
+ LastError = action.Error;
+
+ if (!LastError.IsClientHealthy && _connection != null)
+ {
+ _connection.IsHealthy = false;
+ ConnectionProvider.Servers.BlackList(_connection.Server);
+ ConnectionProvider.Close(_connection);
+ _connection = null;
+ }
+
+ if (!LastError.ShouldRetry)
+ {
+ break;
+ }
+ }
+
+ counter++;
+ }
+ while (counter < ConnectionProvider.ConnectionBuilder.MaxRetries && !noException);
+
+ if (!noException && (throwOnError ?? ThrowErrors))
+ {
+ throw LastError;
+ }
return result;
}
@@ -16,6 +16,7 @@ public class Connection : IConnection, IDisposable
private Cassandra.Client _client;
private string _activeKeyspace;
private string _activeCqlVersion;
+ private bool _healthy;
private readonly object _lock = new object();
/// <summary>
@@ -38,7 +39,7 @@ public Connection(Server server, ConnectionType connectionType, int bufferSize)
Server = server;
ConnectionType = connectionType;
BufferSize = bufferSize;
-
+ _healthy = true;
InitTransportAndClient();
}
@@ -205,6 +206,23 @@ public Cassandra.Client Client
}
}
+ /// <summary>
+ ///
+ /// </summary>
+ public bool IsHealthy
+ {
+ get
+ {
+ lock (_lock)
+ return _healthy;
+ }
+ set
+ {
+ lock (_lock)
+ _healthy = value;
+ }
+ }
+
/// <summary>
///
/// </summary>
@@ -14,14 +14,15 @@ public class ConnectionBuilder : FluentCassandra.Connections.IConnectionBuilder
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
- public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
+ public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
{
Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) };
ConnectionTimeout = TimeSpan.FromSeconds(connectionTimeout);
Pooling = pooling;
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
+ MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -35,14 +36,15 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString();
}
- public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
+ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
{
Keyspace = keyspace;
Servers = new List<Server>() { server };
ConnectionTimeout = TimeSpan.FromSeconds(server.Timeout);
Pooling = pooling;
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
+ MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -154,9 +156,26 @@ private void InitializeConnectionString(string connectionString)
#endregion
- #region ConnectionTimeout
+ #region MaxRetries
- if (!pairs.ContainsKey("Connection Timeout"))
+ if (pairs.ContainsKey("Max Retries"))
+ {
+ int maxRetries;
+
+ if (!Int32.TryParse(pairs["Max Retries"], out maxRetries))
+ maxRetries = 0;
+
+ if (maxRetries < 0)
+ maxRetries = 0;
+
+ MaxRetries = maxRetries;
+ }
+
+ #endregion
+
+ #region ConnectionTimeout
+
+ if (!pairs.ContainsKey("Connection Timeout"))
{
ConnectionTimeout = TimeSpan.Zero;
}
@@ -401,6 +420,13 @@ private string GetConnectionString()
/// </summary>
public int MaxPoolSize { get; private set; }
+ /// <summary>
+ /// The maximum number of execution retry attempts if there is an error
+ /// during the execution of an operation and the exception thrown is marked as
+ /// ShouldRetry = true
+ /// </summary>
+ public int MaxRetries { get; private set; }
+
/// <summary>
/// When a connection is returned to the pool, its creation time is compared with the current time, and the connection is destroyed if that time span (in seconds) exceeds the value specified by Connection Lifetime. This is useful in clustered configurations to force load balancing between a running server and a server just brought online. A value of zero (0) causes pooled connections to have the maximum connection timeout.
/// </summary>
@@ -7,6 +7,7 @@ public interface IConnection : IDisposable
{
DateTime Created { get; }
bool IsOpen { get; }
+ bool IsHealthy { get; set; }
Server Server { get; }
Cassandra.Client Client { get; }
@@ -12,6 +12,7 @@ public interface IConnectionBuilder
bool Pooling { get; }
int MinPoolSize { get; }
int MaxPoolSize { get; }
+ int MaxRetries { get; }
TimeSpan ConnectionTimeout { get; }
TimeSpan ConnectionLifetime { get; }
ConnectionType ConnectionType { get; }
@@ -41,14 +41,15 @@ public override IConnection Open()
}
catch (SocketException exc)
{
+ conn.IsHealthy = false;
Servers.ErrorOccurred(conn.Server, exc);
Close(conn);
conn = null;
}
}
if (conn == null)
- throw new CassandraException("No connection could be made because all servers have failed.");
+ throw new CassandraException("No connection could be made because all servers have failed.", false, false);
return conn;
}
@@ -59,7 +59,7 @@ public override IConnection CreateConnection()
else if (_freeConnections.Count + _usedConnections.Count >= MaxPoolSize)
{
if (!Monitor.Wait(_lock, TimeSpan.FromSeconds(30)))
- throw new CassandraException("No connection could be made, timed out trying to aquire a connection from the connection pool.");
+ throw new CassandraException("No connection could be made, timed out trying to aquire a connection from the connection pool.", false, false);
return CreateConnection();
}
@@ -106,7 +106,7 @@ public void Cleanup()
/// <returns>True if alive; otherwise false.</returns>
private bool IsAlive(IConnection connection)
{
- if (ConnectionLifetime > TimeSpan.Zero && connection.Created.Add(ConnectionLifetime) < DateTime.UtcNow)
+ if (!connection.IsHealthy || (ConnectionLifetime > TimeSpan.Zero && connection.Created.Add(ConnectionLifetime) < DateTime.UtcNow))
return false;
return connection.IsOpen;
Oops, something went wrong. Retry.

0 comments on commit b8ed8e1

Please sign in to comment.