Browse files

minor changes for issue #89

  • Loading branch information...
1 parent 895edb5 commit b82a93237f78f0540668ae60701d479ba4138abb @nberardi nberardi committed Nov 21, 2012
View
90 src/Connections/ConnectionBuilder.cs
@@ -14,7 +14,7 @@ public class ConnectionBuilder : FluentCassandra.Connections.IConnectionBuilder
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
- public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
+ public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int maxRetries = 0, int serverPollingInterval = 30, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
{
Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) };
@@ -23,7 +23,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
- ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
+ ServerPollingInterval = TimeSpan.FromSeconds(serverPollingInterval);
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -37,7 +37,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString();
}
- public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0, int serverRecoveryInterval = RoundRobinServerManager.DefaultServerRecoveryInterval)
+ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int maxRetries = 0, int serverPollingInterval = 30, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
{
Keyspace = keyspace;
Servers = new List<Server>() { server };
@@ -46,8 +46,8 @@ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, i
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
MaxRetries = maxRetries;
+ ServerPollingInterval = TimeSpan.FromSeconds(serverPollingInterval);
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
- ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
ConnectionType = connectionType;
BufferSize = bufferSize;
ReadConsistency = read;
@@ -98,6 +98,24 @@ private void InitializeConnectionString(string connectionString)
#endregion
+ #region ConnectionTimeout
+
+ if (!pairs.ContainsKey("Connection Timeout")) {
+ ConnectionTimeout = TimeSpan.Zero;
+ } else {
+ int connectionTimeout;
+
+ if (!Int32.TryParse(pairs["Connection Timeout"], out connectionTimeout))
+ throw new CassandraException("Connection Timeout is not valid.");
+
+ if (connectionTimeout < 0)
+ connectionTimeout = 0;
+
+ ConnectionTimeout = TimeSpan.FromSeconds(connectionTimeout);
+ }
+
+ #endregion
+
#region Pooling
if (!pairs.ContainsKey("Pooling"))
@@ -175,23 +193,23 @@ private void InitializeConnectionString(string connectionString)
#endregion
- #region ConnectionTimeout
+ #region ServerRecoveryInterval
- if (!pairs.ContainsKey("Connection Timeout"))
+ if (!pairs.ContainsKey("Server Recovery Interval"))
{
- ConnectionTimeout = TimeSpan.Zero;
- }
- else
+ ServerPollingInterval = TimeSpan.FromSeconds(30);
+ }
+ else
{
- int connectionTimeout;
+ int serverRecoveryInterval;
- if (!Int32.TryParse(pairs["Connection Timeout"], out connectionTimeout))
- throw new CassandraException("Connection Timeout is not valid.");
+ if (!Int32.TryParse(pairs["Server Recovery Interval"], out serverRecoveryInterval))
+ serverRecoveryInterval = 0;
- if (connectionTimeout < 0)
- connectionTimeout = 0;
+ if (serverRecoveryInterval < 0)
+ serverRecoveryInterval = 0;
- ConnectionTimeout = TimeSpan.FromSeconds(connectionTimeout);
+ ServerPollingInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
}
#endregion
@@ -209,19 +227,19 @@ private void InitializeConnectionString(string connectionString)
if (!Int32.TryParse(pairs["Connection Lifetime"], out lifetime))
lifetime = 0;
+ if (lifetime < 0)
+ lifetime = 0;
+
ConnectionLifetime = TimeSpan.FromSeconds(lifetime);
}
#endregion
#region ConnectionType
- if (!pairs.ContainsKey("Connection Type"))
- {
+ if (!pairs.ContainsKey("Connection Type")) {
ConnectionType = ConnectionType.Framed;
- }
- else
- {
+ } else {
ConnectionType type;
if (!Enum.TryParse(pairs["Connection Type"], out type))
@@ -232,25 +250,9 @@ private void InitializeConnectionString(string connectionString)
#endregion
- #region ServerRecoveryInterval
- if (!pairs.ContainsKey("Server Recovery Interval"))
- {
- ServerRecoveryInterval = TimeSpan.FromSeconds(RoundRobinServerManager.DefaultServerRecoveryInterval);
- }
- else
- {
- int serverRecoveryInterval;
-
- if (!Int32.TryParse(pairs["Server Recovery Interval"], out serverRecoveryInterval))
- serverRecoveryInterval = 0;
-
- ServerRecoveryInterval = TimeSpan.FromSeconds(serverRecoveryInterval);
- }
- #endregion
+ #region BufferSize
- #region BufferSize
-
- if (!pairs.ContainsKey("Buffer Size"))
+ if (!pairs.ContainsKey("Buffer Size"))
{
BufferSize = 1024;
}
@@ -396,10 +398,10 @@ private string GetConnectionString()
b.AppendFormat(format, "Pooling", Pooling);
b.AppendFormat(format, "Min Pool Size", MinPoolSize);
b.AppendFormat(format, "Max Pool Size", MaxPoolSize);
- b.AppendFormat(format, "Max Retries", MaxRetries);
+ b.AppendFormat(format, "Max Retries", MaxRetries);
b.AppendFormat(format, "Connection Timeout", Convert.ToInt32(ConnectionTimeout.TotalSeconds));
b.AppendFormat(format, "Connection Lifetime", Convert.ToInt32(ConnectionLifetime.TotalSeconds));
- b.AppendFormat(format, "Server Recovery Interval", Convert.ToInt32(ServerRecoveryInterval.TotalSeconds));
+ b.AppendFormat(format, "Server Recovery Interval", Convert.ToInt32(ServerPollingInterval.TotalSeconds));
b.AppendFormat(format, "Connection Type", ConnectionType);
b.AppendFormat(format, "Buffer Size", BufferSize);
@@ -455,10 +457,10 @@ private string GetConnectionString()
/// </summary>
public ConnectionType ConnectionType { get; private set; }
- /// <summary>
- ///
- /// </summary>
- public TimeSpan ServerRecoveryInterval { get; private set; }
+ /// <summary>
+ ///
+ /// </summary>
+ public TimeSpan ServerPollingInterval { get; private set; }
/// <summary>
///
/// </summary>
View
5 src/Connections/IConnectionBuilder.cs
@@ -13,10 +13,11 @@ public interface IConnectionBuilder
int MinPoolSize { get; }
int MaxPoolSize { get; }
int MaxRetries { get; }
+ TimeSpan ServerPollingInterval { get; }
+
TimeSpan ConnectionTimeout { get; }
- TimeSpan ConnectionLifetime { get; }
- TimeSpan ServerRecoveryInterval { get; }
ConnectionType ConnectionType { get; }
+ TimeSpan ConnectionLifetime { get; }
int BufferSize { get; }
ConsistencyLevel ReadConsistency { get; }
ConsistencyLevel WriteConsistency { get; }
View
83 src/Connections/RoundRobinServerManager.cs
@@ -7,62 +7,61 @@ namespace FluentCassandra.Connections
{
public class RoundRobinServerManager : IServerManager
{
- public const int DefaultServerRecoveryInterval = 30;
- private readonly object _lock = new object();
+ private readonly object _lock = new object();
private List<Server> _servers;
private Queue<Server> _serverQueue;
private HashSet<Server> _blackListed;
private Timer _recoveryTimer;
- private long _recoveryTimerInterval;
+ private long _recoveryTimerInterval;
public RoundRobinServerManager(IConnectionBuilder builder)
{
_servers = new List<Server>(builder.Servers);
_serverQueue = new Queue<Server>(_servers);
_blackListed = new HashSet<Server>();
- _recoveryTimerInterval = (long)builder.ServerRecoveryInterval.TotalMilliseconds;
- _recoveryTimer = new Timer(o => ServerRecover(), null, _recoveryTimerInterval, Timeout.Infinite);
+ _recoveryTimerInterval = (long)builder.ServerPollingInterval.TotalMilliseconds;
+ _recoveryTimer = new Timer(o => ServerRecover(), null, _recoveryTimerInterval, Timeout.Infinite);
}
private void ServerRecover()
{
- try
- {
- if (_blackListed.Count > 0)
- {
- var clonedBlackList = (HashSet<Server>)null;
-
- lock (_lock)
- clonedBlackList = new HashSet<Server>(_blackListed);
-
- foreach (var server in clonedBlackList)
- {
- var connection = new Connection(server, ConnectionType.Simple, 1024);
-
- try
- {
- connection.Open();
- lock(_lock)
- {
- _blackListed.Remove(server);
- _serverQueue.Enqueue(server);
- }
- }
- catch { }
- finally
- {
- connection.Close();
- }
- }
- clonedBlackList.Clear();
- }
- }
- finally
- {
- _recoveryTimer.Change(_recoveryTimerInterval, Timeout.Infinite);
- }
- }
-
+ try
+ {
+ if (_blackListed.Count > 0)
+ {
+ var clonedBlackList = (HashSet<Server>)null;
+
+ lock (_lock)
+ clonedBlackList = new HashSet<Server>(_blackListed);
+
+ foreach (var server in clonedBlackList)
+ {
+ var connection = new Connection(server, ConnectionType.Simple, 1024);
@tjake
tjake added a note Nov 21, 2012

This should be ConnectionType.Framed

Why? Framed is for data transferring. We are only connecting to see if we can connect? I do see a problem in that the connection never gets disposed of though.

@tjake
tjake added a note Nov 21, 2012

That's true. But there isn't a reason not to use framed. it might be beneficial to run a health check query like describe_ring

That will be a good thing to tackle when the new manager for the ring is released. But we would probably use the client for that anyways, which goes through the default connection string, which allows the connection type to be configured.

The reason I choose simple, is because the only thing that Framed does is wrap a socket in a buffer, and since we aren't sending or receiving data, that overhead is unnecessary.

https://github.com/managedfusion/fluentcassandra/blob/master/src/Thrift/Transport/TFramedTransport.cs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ try
+ {
+ connection.Open();
+ lock(_lock)
+ {
+ _blackListed.Remove(server);
+ _serverQueue.Enqueue(server);
+ }
+ }
+ catch { }
+ finally
+ {
+ connection.Close();
+ }
+ }
+ clonedBlackList.Clear();
+ }
+ }
+ finally
+ {
+ _recoveryTimer.Change(_recoveryTimerInterval, Timeout.Infinite);
+ }
+ }
+
#region IServerManager Members
View
120 src/Operations/Operation.cs
@@ -32,75 +32,69 @@ public virtual bool TryExecute(out TResult result)
_executionCount++;
- try
- {
- result = Execute();
- //We have to set HasError to false after execution.
- //In the event that we retry HasError is set to true from
- //the initial exception. When we do retry and its good
- //We return false because HasError == true and the
- //exception is thrown even though we may have
- //retried successfully.
- HasError = false;
- Error = null;
- }
- catch (AuthenticationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (AuthorizationException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (InvalidRequestException exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
- catch (UnavailableException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
- }
- catch (TimeoutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
- }
- catch (TimedOutException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), false, out result);
- }
- catch (TTransportException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
- }
- catch (IOException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
- }
- catch (NotFoundException exc)
- {
- ExceptionOccuredRetryExecution(new CassandraOperationException(exc), true, out result);
- }
- catch (Exception exc)
- {
- ExceptionOccurred(new CassandraOperationException(exc));
- result = default(TResult);
- }
+ try
+ {
+ result = Execute();
+
+ // HasError needs to be reset to false incase a retry happens
+ HasError = false;
+ Error = null;
+ }
+ catch (AuthenticationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (AuthorizationException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (InvalidRequestException exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
+ catch (UnavailableException exc)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (TimeoutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (TimedOutException exc)
+ {
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (TTransportException exc)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (IOException exc)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (NotFoundException exc)
+ {
+ Session.MarkCurrentConnectionAsUnhealthy(exc);
+ ExceptionOccuredRetryExecution(new CassandraOperationException(exc), out result);
+ }
+ catch (Exception exc)
+ {
+ ExceptionOccurred(new CassandraOperationException(exc));
+ result = default(TResult);
+ }
return !HasError;
}
- private void ExceptionOccuredRetryExecution(CassandraOperationException exc, bool markClientAsUnHealthy, out TResult result)
+ private void ExceptionOccuredRetryExecution(CassandraOperationException exc, out TResult result)
{
ExceptionOccurred(exc);
-
- if (markClientAsUnHealthy)
- {
- Session.MarkCurrentConnectionAsUnhealthy(exc);
- }
-
TryExecute(out result);
}

0 comments on commit b82a932

Please sign in to comment.