Permalink
Browse files

merged changes from @eplowe and refactored part of the code to follow…

… the style used in this project
  • Loading branch information...
2 parents 22f1e4e + de1be97 commit 0a960d9a9b6d4881d783e8812c9e5d91a7c8b833 @nberardi nberardi committed Nov 20, 2012
View
@@ -1,6 +1,6 @@

Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 2012
+# Visual Studio 2010
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FluentCassandra", "src\FluentCassandra.csproj", "{EAA32600-3C2A-4B34-B9B2-5764F280FCE3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FluentCassandra.Sandbox", "test\FluentCassandra.Sandbox\FluentCassandra.Sandbox.csproj", "{DE6B46DE-C37A-49AF-8B9A-B9B6D4F03A55}"
View
@@ -117,6 +117,12 @@ internal CassandraClientWrapper GetClient(bool setKeyspace = true, bool? setCqlV
return new CassandraClientWrapper(_connection.Client);
}
+ internal void MarkCurrentConnectionAsUnhealthy(Exception exc)
+ {
+ ConnectionProvider.ErrorOccurred(_connection, exc);
+ _connection = null;
+ }
+
/// <summary>
///
/// </summary>
@@ -154,12 +160,12 @@ public void Login(string username, string password)
}
/// <summary>
- /// The last error that occured during the execution of an operation.
+ /// The last error that occurred during the execution of an operation.
/// </summary>
public CassandraException LastError { get; private set; }
/// <summary>
- /// Indicates if errors should be thrown when occuring on opperation.
+ /// Indicates if errors should be thrown when occurring on operation.
/// </summary>
public bool ThrowErrors { get; set; }
@@ -221,7 +227,7 @@ public void Dispose()
/// </param>
protected virtual void Dispose(bool disposing)
{
- if (!WasDisposed && disposing && _connection != null)
+ if (!WasDisposed && disposing && _connection != null)
ConnectionProvider.Close(_connection);
WasDisposed = true;
@@ -237,4 +243,4 @@ protected virtual void Dispose(bool disposing)
#endregion
}
-}
+}
@@ -16,6 +16,7 @@ public class Connection : IConnection, IDisposable
private Cassandra.Client _client;
private string _activeKeyspace;
private string _activeCqlVersion;
+ private bool _healthy;
private readonly object _lock = new object();
/// <summary>
@@ -38,7 +39,7 @@ public Connection(Server server, ConnectionType connectionType, int bufferSize)
Server = server;
ConnectionType = connectionType;
BufferSize = bufferSize;
-
+ _healthy = true;
InitTransportAndClient();
}
@@ -14,14 +14,15 @@ public class ConnectionBuilder : FluentCassandra.Connections.IConnectionBuilder
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
- public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
+ public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
{
Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) };
ConnectionTimeout = TimeSpan.FromSeconds(connectionTimeout);
Pooling = pooling;
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
+ MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -35,14 +36,15 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString();
}
- public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
+ public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null, int maxRetries = 0)
{
Keyspace = keyspace;
Servers = new List<Server>() { server };
ConnectionTimeout = TimeSpan.FromSeconds(server.Timeout);
Pooling = pooling;
MinPoolSize = minPoolSize;
MaxPoolSize = maxPoolSize;
+ MaxRetries = maxRetries;
ConnectionLifetime = TimeSpan.FromSeconds(connectionLifetime);
ConnectionType = connectionType;
BufferSize = bufferSize;
@@ -82,7 +84,7 @@ private void InitializeConnectionString(string connectionString)
if (nameValue.Length != 2)
continue;
- pairs.Add(nameValue[0].Trim(), nameValue[1].Trim());
+ pairs.Add(nameValue[0].Trim(), nameValue[1].Trim());
}
#region Keyspace
@@ -154,6 +156,23 @@ private void InitializeConnectionString(string connectionString)
#endregion
+ #region MaxRetries
+
+ if (pairs.ContainsKey("Max Retries"))
+ {
+ int maxRetries;
+
+ if (!Int32.TryParse(pairs["Max Retries"], out maxRetries))
+ maxRetries = 0;
+
+ if (maxRetries < 0)
+ maxRetries = 0;
+
+ MaxRetries = maxRetries;
+ }
+
+ #endregion
+
#region ConnectionTimeout
if (!pairs.ContainsKey("Connection Timeout"))
@@ -330,18 +349,18 @@ private void InitializeConnectionString(string connectionString)
foreach (var server in servers)
{
string[] serverParts = server.Split(':');
- string host = serverParts[0].Trim();
+ string host = serverParts[0].Trim();
if (serverParts.Length == 2)
{
int port;
- if (Int32.TryParse(serverParts[1].Trim(), out port))
+ if (Int32.TryParse(serverParts[1].Trim(), out port))
Servers.Add(new Server(host: host, port: port, timeout: ConnectionTimeout.Seconds));
else
Servers.Add(new Server(host: host, timeout: ConnectionTimeout.Seconds));
}
else
- Servers.Add(new Server(host: host, timeout: ConnectionTimeout.Seconds));
+ Servers.Add(new Server(host: host, timeout: ConnectionTimeout.Seconds));
}
}
@@ -401,6 +420,11 @@ private string GetConnectionString()
/// </summary>
public int MaxPoolSize { get; private set; }
+ /// <summary>
+ /// The maximum number of execution retry attempts if there is an error during the execution of an operation and the exception is a type that can be retried.
+ /// </summary>
+ public int MaxRetries { get; private set; }
+
/// <summary>
/// When a connection is returned to the pool, its creation time is compared with the current time, and the connection is destroyed if that time span (in seconds) exceeds the value specified by Connection Lifetime. This is useful in clustered configurations to force load balancing between a running server and a server just brought online. A value of zero (0) causes pooled connections to have the maximum connection timeout.
/// </summary>
@@ -42,6 +42,8 @@ public virtual IConnection Open()
return conn;
}
+ public abstract void ErrorOccurred(IConnection connection, Exception exc = null);
+
/// <summary>
///
/// </summary>
@@ -12,6 +12,7 @@ public interface IConnectionBuilder
bool Pooling { get; }
int MinPoolSize { get; }
int MaxPoolSize { get; }
+ int MaxRetries { get; }
TimeSpan ConnectionTimeout { get; }
TimeSpan ConnectionLifetime { get; }
ConnectionType ConnectionType { get; }
@@ -9,9 +9,10 @@ public interface IConnectionProvider
IServerManager Servers { get; }
IConnection CreateConnection();
-
IConnection Open();
+ void ErrorOccurred(IConnection connection, Exception exc = null);
+
bool Close(IConnection connection);
}
}
@@ -9,7 +9,6 @@ public interface IServerManager : IEnumerable<Server>
Server Next();
void ErrorOccurred(Server server, Exception exc = null);
- void BlackList(Server server);
void Add(Server server);
void Remove(Server server);
@@ -53,6 +53,12 @@ public override IConnection Open()
return conn;
}
+ public override void ErrorOccurred(IConnection connection, Exception exc = null)
+ {
+ try { Close(connection); } catch { }
+ Servers.ErrorOccurred(connection.Server, exc);
+ }
+
/// <summary>
///
/// </summary>
@@ -68,4 +74,4 @@ public override IConnection CreateConnection()
return conn;
}
}
-}
+}
@@ -9,7 +9,7 @@ public class PooledConnectionProvider : NormalConnectionProvider
private readonly object _lock = new object();
private readonly Queue<IConnection> _freeConnections = new Queue<IConnection>();
- private readonly List<IConnection> _usedConnections = new List<IConnection>();
+ private readonly HashSet<IConnection> _usedConnections = new HashSet<IConnection>();
private readonly Timer _maintenanceTimer;
/// <summary>
@@ -59,7 +59,7 @@ public override IConnection CreateConnection()
else if (_freeConnections.Count + _usedConnections.Count >= MaxPoolSize)
{
if (!Monitor.Wait(_lock, TimeSpan.FromSeconds(30)))
- throw new CassandraException("No connection could be made, timed out trying to aquire a connection from the connection pool.");
+ throw new TimeoutException("No connection could be made, timed out trying to acquirer a connection from the connection pool.");
return CreateConnection();
}
@@ -73,6 +73,22 @@ public override IConnection CreateConnection()
return conn;
}
+ public override void ErrorOccurred(IConnection connection, Exception exc = null)
+ {
+ lock (_lock)
+ {
+ _usedConnections.RemoveWhere(x => x.Server == connection.Server);
+ Servers.ErrorOccurred(connection.Server, exc);
+
+ var currentFreeConnections = _freeConnections.ToArray();
+ _freeConnections.Clear();
+
+ foreach (var conn in currentFreeConnections)
+ if (conn.Server != connection.Server)
+ _freeConnections.Enqueue(conn);
+ }
+ }
+
/// <summary>
///
/// </summary>
@@ -132,4 +148,4 @@ private void CheckFreeConnectionsAlive()
}
}
}
-}
+}
Oops, something went wrong.

0 comments on commit 0a960d9

Please sign in to comment.