Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Blacklist Refactoring #89

Merged
merged 3 commits into from

2 participants

Eric Plowe Nick Berardi
Eric Plowe

Some bug fixes and small tweaks to the blacklisting enhancements.

Bug Fixes

  • When removing a server from the blacklist it was not being sent back to the serverqueue
  • Added some additional exception types to handle in Operation::TryExecute()
  • Added an additional input parameter on Operation::ExceptionOccuredRetryExecution called: bool markClientAsUnhealthy so that we can selectively blacklist a server when retrying.
  • In Operation::TryExecute() after Execute() is called must mark HasError = false and Error = null. If not and there is an exception thrown on the first pass but the next pass results in a valid execution the exception was still being thrown.

Additions

  • Added ServerRecoveryInterval to ConnectionString/Builder.
  • Turned the recovery timer into a "one shot" timer and calling the Timer::Change() method at the end of the callback to avoid potential overlap. We could consider using Timers.Timer but then we'd need to think about to handle overlap there as well. Potentially with a Monitor.TryEnter? I am open to suggestions.
eplowe added some commits
Eric Plowe eplowe Merge pull request #1 from managedfusion/master
Updating My Fork.
92bc9b8
Eric Plowe eplowe Some bug tweaks to blacklisting enhancements
	*When removing a server from the blacklist it was not being sent back to the serverqueue
	*Added some additional exception types to handle in Operation::TryExecute()
	*Added an additional input parameter on Operation::ExceptionOccuredRetryExecution called: bool markClientAsUnhealthy
	so that we can selectively blacklist a server when retrying.
	*In Operation::TryExecute() after Execute() is called must mark HasError = false and Error = null. If not and there is
	an exception thrown on the first pass but the next pass results in a valid execution the exception was still being thrown.

Added ServerRecoveryInterval to ConnectionString/Builder.
	Turned the recovery timer into a "one shot" timer and calling the Timer::Change() method at the end of the callback
	to avoid potential overlap. We could consider using Timers.Timer but then we'd need to think about to handle overlap there as well.
	Potentially with a Monitor.TryEnter? I am open to suggestions.
d47e026
Eric Plowe eplowe Update to remove some stuff I added to the solution file. f147197
Nick Berardi nberardi merged commit 895edb5 into from
Nick Berardi nberardi referenced this pull request from a commit
Nick Berardi nberardi minor changes for issue #89 b82a932
Nick Berardi nberardi referenced this pull request from a commit
Nick Berardi nberardi changed connection name from Server Recovery Interval to Server Polli…
…ng Interval incase we want to preemptively check for bad servers in the future, issue #89
875ae92
Nick Berardi
Owner

The only real change I made was the constant in RoundRobinServerManager. The default value really shouldn't be on a configurable thing like a manager, since it can easily change and the server polling interval can be used in other non round robin implementations.

Nick Berardi
Owner

Overall a good command thanks.

Eric Plowe
Nick Berardi nberardi was assigned
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 20, 2012
  1. Eric Plowe

    Merge pull request #1 from managedfusion/master

    eplowe authored
    Updating My Fork.
Commits on Nov 21, 2012
  1. Eric Plowe

    Some bug tweaks to blacklisting enhancements

    eplowe authored
    	*When removing a server from the blacklist it was not being sent back to the serverqueue
    	*Added some additional exception types to handle in Operation::TryExecute()
    	*Added an additional input parameter on Operation::ExceptionOccuredRetryExecution called: bool markClientAsUnhealthy
    	so that we can selectively blacklist a server when retrying.
    	*In Operation::TryExecute() after Execute() is called must mark HasError = false and Error = null. If not and there is
    	an exception thrown on the first pass but the next pass results in a valid execution the exception was still being thrown.
    
    Added ServerRecoveryInterval to ConnectionString/Builder.
    	Turned the recovery timer into a "one shot" timer and calling the Timer::Change() method at the end of the callback
    	to avoid potential overlap. We could consider using Timers.Timer but then we'd need to think about to handle overlap there as well.
    	Potentially with a Monitor.TryEnter? I am open to suggestions.
  2. Eric Plowe
This page is out of date. Refresh to see the latest.
2  src/Connections/Connection.cs
View
@@ -16,7 +16,6 @@ public class Connection : IConnection, IDisposable
private Cassandra.Client _client;
private string _activeKeyspace;
private string _activeCqlVersion;
- private bool _healthy;
private readonly object _lock = new object();
/// <summary>
@@ -39,7 +38,6 @@ public Connection(Server server, ConnectionType connectionType, int bufferSize)
Server = server;
ConnectionType = connectionType;
BufferSize = bufferSize;
- _healthy = true;
InitTransportAndClient();
}
32 src/Connections/ConnectionBuilder.cs
View
@@ -14,7 +14,7 @@ public class ConnectionBuilder : FluentCassandra.Connections.IConnectionBuilder
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
- public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
+ public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
{
Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) };
@@ -23,6 +23,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -36,7 +37,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString();
}
- public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
+ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
{
Keyspace = keyspace;
Servers = new List<Server>() { server };
@@ -46,6 +47,7 @@ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, i
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
ConnectionType = connectionType;
BufferSize = bufferSize;
ReadConsistency = read;
@@ -230,9 +232,25 @@ private void InitializeConnectionString(string connectionString)
#endregion
- #region BufferSize
+ #region ServerRecoveryInterval
+ if (!pairs.ContainsKey("Server Recovery Interval"))
+ {
+ ServerRecoveryInterval = TimeSpan.FromSeconds(RoundRobinServerManager.DefaultServerRecoveryInterval);
+ }
+ else
+ {
+ int serverRecoveryInterval;
- if (!pairs.ContainsKey("Buffer Size"))
+ if (!Int32.TryParse(pairs["Server Recovery Interval"], out serverRecoveryInterval))
+ serverRecoveryInterval = 0;
+
+ ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
+ }
+ #endregion
+
+ #region BufferSize
+
+ if (!pairs.ContainsKey("Buffer Size"))
{
BufferSize = 1024;
}
@@ -378,8 +396,10 @@ private string GetConnectionString()
b.AppendFormat(format, "Pooling", Pooling);
b.AppendFormat(format, "Min Pool Size", MinPoolSize);
b.AppendFormat(format, "Max Pool Size", MaxPoolSize);
+ b.AppendFormat(format, "Max Retries", MaxRetries);
b.AppendFormat(format, "Connection Timeout", Convert.ToInt32(ConnectionTimeout.TotalSeconds));
b.AppendFormat(format, "Connection Lifetime", Convert.ToInt32(ConnectionLifetime.TotalSeconds));
+ b.AppendFormat(format, "Server Recovery Interval", Convert.ToInt32(ServerRecoveryInterval.TotalSeconds));
b.AppendFormat(format, "Connection Type", ConnectionType);
b.AppendFormat(format, "Buffer Size", BufferSize);
@@ -435,6 +455,10 @@ private string GetConnectionString()
/// </summary>
public ConnectionType ConnectionType { get; private set; }
+ /// <summary>
+ ///
+ /// </summary>
+ public TimeSpan ServerRecoveryInterval { get; private set; }
/// <summary>
///
/// </summary>
4 src/Connections/IConnectionBuilder.cs
View
@@ -15,9 +15,9 @@ public interface IConnectionBuilder
int MaxRetries { get; }
TimeSpan ConnectionTimeout { get; }
TimeSpan ConnectionLifetime { get; }
+ TimeSpan ServerRecoveryInterval { get; }
ConnectionType ConnectionType { get; }
-
- int BufferSize { get; }
+ int BufferSize { get; }
ConsistencyLevel ReadConsistency { get; }
ConsistencyLevel WriteConsistency { get; }
71 src/Connections/RoundRobinServerManager.cs
View
@@ -7,49 +7,62 @@ namespace FluentCassandra.Connections
{
public class RoundRobinServerManager : IServerManager
{
- private readonly object _lock = new object();
+ public const int DefaultServerRecoveryInterval = 30;
+ private readonly object _lock = new object();
private List<Server> _servers;
private Queue<Server> _serverQueue;
private HashSet<Server> _blackListed;
private Timer _recoveryTimer;
+ private long _recoveryTimerInterval;
public RoundRobinServerManager(IConnectionBuilder builder)
{
_servers = new List<Server>(builder.Servers);
_serverQueue = new Queue<Server>(_servers);
_blackListed = new HashSet<Server>();
-
- _recoveryTimer = new Timer(o => ServerRecover(), null, 30000L, 30000L);
+ _recoveryTimerInterval = (long)builder.ServerRecoveryInterval.TotalMilliseconds;
+ _recoveryTimer = new Timer(o => ServerRecover(), null, _recoveryTimerInterval, Timeout.Infinite);
}
private void ServerRecover()
{
- if (_blackListed.Count == 0)
- return;
-
- var clonedBlackList = (HashSet<Server>)null;
-
- lock (_lock)
- clonedBlackList = new HashSet<Server>(this._blackListed);
-
- foreach (var server in clonedBlackList)
- {
- var connection = new Connection(server, ConnectionType.Simple, 1024);
-
- try
- {
- connection.Open();
-
- lock (_lock)
- _blackListed.Remove(server);
- }
- catch { }
- finally
- {
- connection.Close();
- }
- }
- }
+ try
+ {
+ if (_blackListed.Count > 0)
+ {
+ var clonedBlackList = (HashSet<Server>)null;
+
+ lock (_lock)
+ clonedBlackList = new HashSet<Server>(_blackListed);
+
+ foreach (var server in clonedBlackList)
+ {
+ var connection = new Connection(server, ConnectionType.Simple, 1024);
+
+ try
+ {
+ connection.Open();
+ lock(_lock)
+ {
+ _blackListed.Remove(server);
+ _serverQueue.Enqueue(server);
+ }
+ }
+ catch { }
+ finally
+ {
+ connection.Close();
+ }
+ }
+ clonedBlackList.Clear();
+ }
+ }
+ finally
+ {
+ _recoveryTimer.Change(_recoveryTimerInterval, Timeout.Infinite);
+ }
+ }
+
#region IServerManager Members
106 src/Operations/Operation.cs
View
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
+using System.IO;
using FluentCassandra.Apache.Cassandra;
using FluentCassandra.Thrift.Transport;
@@ -12,7 +13,6 @@ public abstract class Operation<TResult>
public Operation()
{
_executionCount = 0;
-
HasError = false;
}
@@ -32,54 +32,74 @@ public virtual bool TryExecute(out TResult result)
_executionCount++;
- try
- {
- result = Execute();
- }
- catch (AuthenticationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (AuthorizationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (InvalidRequestException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (UnavailableException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TimeoutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TimedOutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (TTransportException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
- }
- catch (Exception exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
+ try
+ {
+ result = Execute();
+ //We have to set HasError to false after execution.
+ //In the event that we retry HasError is set to true from
+ //the initial exception. When we do retry and its good
+ //We return false because HasError == true and the
+ //exception is thrown even though we may have
+ //retried successfully.
+ HasError = false;
+ Error = null;
+ }
+ catch (AuthenticationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (AuthorizationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (InvalidRequestException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (UnavailableException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (TimeoutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
+ }
+ catch (TimedOutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
+ }
+ catch (TTransportException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (IOException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (NotFoundException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
+ }
+ catch (Exception exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
return !HasError;
}
- private void ExceptionOccuredRetryExecution(CassandraOperationException exc, out TResult result)
+ private void ExceptionOccuredRetryExecution(CassandraOperationException exc, bool markClientAsUnHealthy, out TResult result)
{
ExceptionOccurred(exc);
- Session.MarkCurrentConnectionAsUnhealthy(exc);
+
+ if (markClientAsUnHealthy)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ }
TryExecute(out result);
}
Something went wrong with that request. Please try again.