Permalink
Browse files

Some bug tweaks to blacklisting enhancements

	*When removing a server from the blacklist it was not being sent back to the serverqueue
	*Added some additional exception types to handle in Operation::TryExecute()
	*Added an additional input parameter on Operation::ExceptionOccuredRetryExecution called: bool markClientAsUnhealthy
	so that we can selectively blacklist a server when retrying.
	*In Operation::TryExecute() after Execute() is called must mark HasError = false and Error = null. If not and there is
	an exception thrown on the first pass but the next pass results in a valid execution the exception was still being thrown.

Added ServerRecoveryInterval to ConnectionString/Builder.
	Turned the recovery timer into a "one shot" timer and calling the Timer::Change() method at the end of the callback
	to avoid potential overlap. We could consider using Timers.Timer but then we'd need to think about to handle overlap there as well.
	Potentially with a Monitor.TryEnter? I am open to suggestions.
  • Loading branch information...
1 parent 92bc9b8 commit d47e0260b955f0c04800d108612fbaf267248e30 @eplowe eplowe committed Nov 21, 2012
View
@@ -20,6 +20,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{1A88B962
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FluentCassandra.Tests", "test\FluentCassandra.Tests\FluentCassandra.Tests.csproj", "{9DAF7022-5820-4214-B13E-AC0A1B37691F}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestPoolExhaustion", "..\..\dev\BTCassandra.Interop\ConsoleApplication2\TestPoolExhaustion.csproj", "{63108852-4D94-4726-8EE3-5DE5D50963E8}"
+EndProject
Global
GlobalSection(TestCaseManagementSettings) = postSolution
CategoryFile = FluentCassandra.vsmdi
@@ -63,6 +65,16 @@ Global
{9DAF7022-5820-4214-B13E-AC0A1B37691F}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{9DAF7022-5820-4214-B13E-AC0A1B37691F}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{9DAF7022-5820-4214-B13E-AC0A1B37691F}.Release|x86.ActiveCfg = Release|Any CPU
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Debug|Any CPU.ActiveCfg = Debug|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Debug|Mixed Platforms.Build.0 = Debug|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Debug|x86.ActiveCfg = Debug|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Debug|x86.Build.0 = Debug|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Release|Any CPU.ActiveCfg = Release|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Release|Mixed Platforms.ActiveCfg = Release|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Release|Mixed Platforms.Build.0 = Release|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Release|x86.ActiveCfg = Release|x86
+ {63108852-4D94-4726-8EE3-5DE5D50963E8}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -16,7 +16,6 @@ public class Connection : IConnection, IDisposable
private Cassandra.Client _client;
private string _activeKeyspace;
private string _activeCqlVersion;
- private bool _healthy;
private readonly object _lock = new object();
/// <summary>
@@ -39,7 +38,6 @@ public Connection(Server server, ConnectionType connectionType, int bufferSize)
Server = server;
ConnectionType = connectionType;
BufferSize = bufferSize;
- _healthy = true;
InitTransportAndClient();
}
@@ -14,7 +14,7 @@ public class ConnectionBuilder : FluentCassandra.Connections.IConnectionBuilder
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
- public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
+ public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
{
Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) };
@@ -23,6 +23,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -36,7 +37,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString();
}
- public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
+ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
{
Keyspace = keyspace;
Servers = new List<Server>() { server };
@@ -46,6 +47,7 @@ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, i
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
ConnectionType = connectionType;
BufferSize = bufferSize;
ReadConsistency = read;
@@ -230,9 +232,25 @@ private void InitializeConnectionString(string connectionString)
#endregion
- #region BufferSize
+ #region ServerRecoveryInterval
+ if (!pairs.ContainsKey("Server Recovery Interval"))
+ {
+ ServerRecoveryInterval = TimeSpan.FromSeconds(RoundRobinServerManager.DefaultServerRecoveryInterval);
+ }
+ else
+ {
+ int serverRecoveryInterval;
- if (!pairs.ContainsKey("Buffer Size"))
+ if (!Int32.TryParse(pairs["Server Recovery Interval"], out serverRecoveryInterval))
+ serverRecoveryInterval = 0;
+
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
+ }
+ #endregion
+
+ #region BufferSize
+
+ if (!pairs.ContainsKey("Buffer Size"))
{
BufferSize = 1024;
}
@@ -378,8 +396,10 @@ private string GetConnectionString()
b.AppendFormat(format, "Pooling", Pooling);
b.AppendFormat(format, "Min Pool Size", MinPoolSize);
b.AppendFormat(format, "Max Pool Size", MaxPoolSize);
+ b.AppendFormat(format, "Max Retries", MaxRetries);
b.AppendFormat(format, "Connection Timeout", Convert.ToInt32(ConnectionTimeout.TotalSeconds));
b.AppendFormat(format, "Connection Lifetime", Convert.ToInt32(ConnectionLifetime.TotalSeconds));
+ b.AppendFormat(format, "Server Recovery Interval", Convert.ToInt32(ServerRecoveryInterval.TotalSeconds));
b.AppendFormat(format, "Connection Type", ConnectionType);
b.AppendFormat(format, "Buffer Size", BufferSize);
@@ -435,6 +455,10 @@ private string GetConnectionString()
/// </summary>
public ConnectionType ConnectionType { get; private set; }
+ /// <summary>
+ ///
+ /// </summary>
+ public TimeSpan ServerRecoveryInterval { get; private set; }
/// <summary>
///
/// </summary>
@@ -15,9 +15,9 @@ public interface IConnectionBuilder
int MaxRetries { get; }
TimeSpan ConnectionTimeout { get; }
TimeSpan ConnectionLifetime { get; }
+ TimeSpan ServerRecoveryInterval { get; }
ConnectionType ConnectionType { get; }
-
- int BufferSize { get; }
+ int BufferSize { get; }
ConsistencyLevel ReadConsistency { get; }
ConsistencyLevel WriteConsistency { get; }
@@ -7,49 +7,62 @@ namespace FluentCassandra.Connections
{
public class RoundRobinServerManager : IServerManager
{
- private readonly object _lock = new object();
+ public const int DefaultServerRecoveryInterval = 30;
+ private readonly object _lock = new object();
private List<Server> _servers;
private Queue<Server> _serverQueue;
private HashSet<Server> _blackListed;
private Timer _recoveryTimer;
+ private long _recoveryTimerInterval;
public RoundRobinServerManager(IConnectionBuilder builder)
{
_servers = new List<Server>(builder.Servers);
_serverQueue = new Queue<Server>(_servers);
_blackListed = new HashSet<Server>();
-
- _recoveryTimer = new Timer(o => ServerRecover(), null, 30000L, 30000L);
+ _recoveryTimerInterval = (long)builder.ServerRecoveryInterval.TotalMilliseconds;
+ _recoveryTimer = new Timer(o => ServerRecover(), null, _recoveryTimerInterval, Timeout.Infinite);
}
private void ServerRecover()
{
- if (_blackListed.Count == 0)
- return;
-
- var clonedBlackList = (HashSet<Server>)null;
-
- lock (_lock)
- clonedBlackList = new HashSet<Server>(this._blackListed);
-
- foreach (var server in clonedBlackList)
- {
- var connection = new Connection(server, ConnectionType.Simple, 1024);
-
- try
- {
- connection.Open();
-
- lock (_lock)
- _blackListed.Remove(server);
- }
- catch { }
- finally
- {
- connection.Close();
- }
- }
- }
+ try
+ {
+ if (_blackListed.Count > 0)
+ {
+ var clonedBlackList = (HashSet<Server>)null;
+
+ lock (_lock)
+ clonedBlackList = new HashSet<Server>(_blackListed);
+
+ foreach (var server in clonedBlackList)
+ {
+ var connection = new Connection(server, ConnectionType.Simple, 1024);
+
+ try
+ {
+ connection.Open();
+ lock(_lock)
+ {
+ _blackListed.Remove(server);
+ _serverQueue.Enqueue(server);
+ }
+ }
+ catch { }
+ finally
+ {
+ connection.Close();
+ }
+ }
+ clonedBlackList.Clear();
+ }
+ }
+ finally
+ {
+ _recoveryTimer.Change(_recoveryTimerInterval, Timeout.Infinite);
+ }
+ }
+
#region IServerManager Members
View
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
+using System.IO;
using FluentCassandra.Apache.Cassandra;
using FluentCassandra.Thrift.Transport;
@@ -12,7 +13,6 @@ public abstract class Operation<TResult>
public Operation()
{
_executionCount = 0;
-
HasError = false;
}
@@ -32,54 +32,74 @@ public virtual bool TryExecute(out TResult result)
_executionCount++;
- try
- {
- result = Execute();
- }
- catch (AuthenticationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (AuthorizationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (InvalidRequestException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (UnavailableException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TimeoutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TimedOutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TTransportException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (Exception exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
+ try
+ {
+ result = Execute();
+ //We have to set HasError to false after execution.
+ //In the event that we retry HasError is set to true from
+ //the initial exception. When we do retry and its good
+ //We return false because HasError == true and the
+ //exception is thrown even though we may have
+ //retried successfully.
+ HasError = false;
+ Error = null;
+ }
+ catch (AuthenticationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (AuthorizationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (InvalidRequestException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (UnavailableException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (TimeoutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
+ }
+ catch (TimedOutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
+ }
+ catch (TTransportException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (IOException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (NotFoundException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (Exception exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
return !HasError;
}
- private void ExceptionOccuredRetryExecution(CassandraOperationException exc, out TResult result)
+ private void ExceptionOccuredRetryExecution(CassandraOperationException exc, bool markClientAsUnHealthy, out TResult result)
{
ExceptionOccurred(exc);
- Session.MarkCurrentConnectionAsUnhealthy(exc);
+
+ if (markClientAsUnHealthy)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ }
TryExecute(out result);
}

0 comments on commit d47e026

Please sign in to comment.