diff --git a/FluentCassandra/CassandraContext.cs b/FluentCassandra/CassandraContext.cs index d7de2aa..162c2b8 100644 --- a/FluentCassandra/CassandraContext.cs +++ b/FluentCassandra/CassandraContext.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using FluentCassandra.Types; using FluentCassandra.Operations; +using FluentCassandra.Connections; namespace FluentCassandra { diff --git a/FluentCassandra/CassandraKeyspace.cs b/FluentCassandra/CassandraKeyspace.cs index 66ce160..85c4a55 100644 --- a/FluentCassandra/CassandraKeyspace.cs +++ b/FluentCassandra/CassandraKeyspace.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using Apache.Cassandra; +using FluentCassandra.Connections; namespace FluentCassandra { diff --git a/FluentCassandra/CassandraSession.cs b/FluentCassandra/CassandraSession.cs index a1fbdf3..8f650ca 100644 --- a/FluentCassandra/CassandraSession.cs +++ b/FluentCassandra/CassandraSession.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using Apache.Cassandra; +using FluentCassandra.Connections; namespace FluentCassandra { diff --git a/FluentCassandra/Connection/Connection.cs b/FluentCassandra/Connections/Connection.cs similarity index 83% rename from FluentCassandra/Connection/Connection.cs rename to FluentCassandra/Connections/Connection.cs index a7d4143..67fb562 100644 --- a/FluentCassandra/Connection/Connection.cs +++ b/FluentCassandra/Connections/Connection.cs @@ -3,7 +3,7 @@ using Thrift.Protocol; using Apache.Cassandra; -namespace FluentCassandra +namespace FluentCassandra.Connections { /// /// @@ -22,20 +22,12 @@ public class Connection : IConnection, IDisposable /// /// /// - internal Connection(Server server, int timeout = 0) + internal Connection(Server server) { Created = DateTime.Now; Server = server; - Timeout = timeout; - //TcpClient client = new TcpClient(server.Host, server.Port); - //client.NoDelay = true; - //client.SendBufferSize = timeout; - //client.ReceiveTimeout = timeout; - - //TTransport socket = new TSocket(client); - - TTransport socket = new TSocket(server.Host, server.Port, timeout); + TTransport socket = new TSocket(server.Host, server.Port, server.Timeout); _transport = new TFramedTransport(socket); _protocol = new TBinaryProtocol(_transport); @@ -51,15 +43,6 @@ public DateTime Created private set; } - /// - /// - /// - public int Timeout - { - get; - private set; - } - /// /// /// diff --git a/FluentCassandra/Connection/ConnectionBuilder.cs b/FluentCassandra/Connections/ConnectionBuilder.cs similarity index 96% rename from FluentCassandra/Connection/ConnectionBuilder.cs rename to FluentCassandra/Connections/ConnectionBuilder.cs index ac37aec..eb0c643 100644 --- a/FluentCassandra/Connection/ConnectionBuilder.cs +++ b/FluentCassandra/Connections/ConnectionBuilder.cs @@ -3,7 +3,7 @@ using System.Text; using Apache.Cassandra; -namespace FluentCassandra +namespace FluentCassandra.Connections { public class ConnectionBuilder { @@ -107,9 +107,9 @@ private void InitializeConnectionString(string connectionString) { int port; if (Int32.TryParse(serverParts[1], out port)) - Servers.Add(new Server(host, port)); + Servers.Add(new Server(host: host, port: port, timeout: Timeout)); else - Servers.Add(new Server(host)); + Servers.Add(new Server(host: host, timeout: Timeout)); } else Servers.Add(new Server(host)); diff --git a/FluentCassandra/Connection/ConnectionProvider.cs b/FluentCassandra/Connections/ConnectionProvider.cs similarity index 82% rename from FluentCassandra/Connection/ConnectionProvider.cs rename to FluentCassandra/Connections/ConnectionProvider.cs index 65f6bbe..649650a 100644 --- a/FluentCassandra/Connection/ConnectionProvider.cs +++ b/FluentCassandra/Connections/ConnectionProvider.cs @@ -1,6 +1,6 @@ using System; -namespace FluentCassandra +namespace FluentCassandra.Connections { public abstract class ConnectionProvider : IConnectionProvider { @@ -11,6 +11,7 @@ public abstract class ConnectionProvider : IConnectionProvider protected ConnectionProvider(ConnectionBuilder builder) { Builder = builder; + Servers = new RoundRobinServerManager(builder); } /// @@ -18,6 +19,11 @@ protected ConnectionProvider(ConnectionBuilder builder) /// public ConnectionBuilder Builder { get; private set; } + /// + /// + /// + public IServerManager Servers { get; private set; } + /// /// /// diff --git a/FluentCassandra/Connection/ConnectionProviderFactory.cs b/FluentCassandra/Connections/ConnectionProviderFactory.cs similarity index 95% rename from FluentCassandra/Connection/ConnectionProviderFactory.cs rename to FluentCassandra/Connections/ConnectionProviderFactory.cs index f2166ec..6f9e1ed 100644 --- a/FluentCassandra/Connection/ConnectionProviderFactory.cs +++ b/FluentCassandra/Connections/ConnectionProviderFactory.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; -namespace FluentCassandra +namespace FluentCassandra.Connections { public static class ConnectionProviderFactory { diff --git a/FluentCassandra/Connection/IConnection.cs b/FluentCassandra/Connections/IConnection.cs similarity index 87% rename from FluentCassandra/Connection/IConnection.cs rename to FluentCassandra/Connections/IConnection.cs index 676e227..07df912 100644 --- a/FluentCassandra/Connection/IConnection.cs +++ b/FluentCassandra/Connections/IConnection.cs @@ -1,7 +1,7 @@ using System; using Apache.Cassandra; -namespace FluentCassandra +namespace FluentCassandra.Connections { public interface IConnection : IDisposable { diff --git a/FluentCassandra/Connection/IConnectionProvider.cs b/FluentCassandra/Connections/IConnectionProvider.cs similarity index 84% rename from FluentCassandra/Connection/IConnectionProvider.cs rename to FluentCassandra/Connections/IConnectionProvider.cs index d55c387..dda796e 100644 --- a/FluentCassandra/Connection/IConnectionProvider.cs +++ b/FluentCassandra/Connections/IConnectionProvider.cs @@ -1,6 +1,6 @@ using System; -namespace FluentCassandra +namespace FluentCassandra.Connections { public interface IConnectionProvider { diff --git a/FluentCassandra/Connections/IServerManager.cs b/FluentCassandra/Connections/IServerManager.cs new file mode 100644 index 0000000..f9a4a95 --- /dev/null +++ b/FluentCassandra/Connections/IServerManager.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; + +namespace FluentCassandra.Connections +{ + public interface IServerManager : IEnumerable + { + bool HasNext { get; } + Server Next(); + + void Add(Server server); + void Remove(Server server); + } +} diff --git a/FluentCassandra/Connection/NormalConnectionProvider.cs b/FluentCassandra/Connections/NormalConnectionProvider.cs similarity index 63% rename from FluentCassandra/Connection/NormalConnectionProvider.cs rename to FluentCassandra/Connections/NormalConnectionProvider.cs index 9efd938..2e472ed 100644 --- a/FluentCassandra/Connection/NormalConnectionProvider.cs +++ b/FluentCassandra/Connections/NormalConnectionProvider.cs @@ -2,12 +2,10 @@ using System.Collections.Generic; using System.Net.Sockets; -namespace FluentCassandra +namespace FluentCassandra.Connections { public class NormalConnectionProvider : ConnectionProvider { - private Random _random = new Random(); - /// /// /// @@ -19,7 +17,6 @@ public NormalConnectionProvider(ConnectionBuilder builder) throw new CassandraException("You must specify a timeout when using multiple servers."); Timeout = builder.Timeout; - ActiveServers = builder.Servers; } /// @@ -27,19 +24,6 @@ public NormalConnectionProvider(ConnectionBuilder builder) /// public int Timeout { get; private set; } - /// - /// - /// - public IList ActiveServers { get; private set; } - - /// - /// Gets if there are any more connections left to try. - /// - public bool HasNext - { - get { return ActiveServers.Count > 0; } - } - /// /// /// @@ -48,7 +32,7 @@ public override IConnection Open() { IConnection conn = null; - while (HasNext) + while (Servers.HasNext) { try { @@ -58,12 +42,9 @@ public override IConnection Open() } catch (SocketException exc) { - using (TimedLock.Lock(ActiveServers)) - { - Close(conn); - ActiveServers.Remove(conn.Server); - conn = null; - } + Close(conn); + Servers.Remove(conn.Server); + conn = null; } } @@ -79,11 +60,11 @@ public override IConnection Open() /// public override IConnection CreateConnection() { - if (ActiveServers.Count == 0) + if (!Servers.HasNext) return null; - var server = Builder.Servers[_random.Next(ActiveServers.Count)]; - var conn = new Connection(server, Timeout); + var server = Servers.Next(); + var conn = new Connection(server); return conn; } diff --git a/FluentCassandra/Connection/PooledConnectionProvider.cs b/FluentCassandra/Connections/PooledConnectionProvider.cs similarity index 98% rename from FluentCassandra/Connection/PooledConnectionProvider.cs rename to FluentCassandra/Connections/PooledConnectionProvider.cs index 75d3d40..41b3866 100644 --- a/FluentCassandra/Connection/PooledConnectionProvider.cs +++ b/FluentCassandra/Connections/PooledConnectionProvider.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Threading; -namespace FluentCassandra +namespace FluentCassandra.Connections { public class PooledConnectionProvider : NormalConnectionProvider { diff --git a/FluentCassandra/Connections/RoundRobinServerManager.cs b/FluentCassandra/Connections/RoundRobinServerManager.cs new file mode 100644 index 0000000..a26e033 --- /dev/null +++ b/FluentCassandra/Connections/RoundRobinServerManager.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; + +namespace FluentCassandra.Connections +{ + public class RoundRobinServerManager : IServerManager + { + private readonly object _lock = new object(); + + private List _servers; + private Queue _serverQueue; + + public RoundRobinServerManager(ConnectionBuilder builder) + { + _servers = new List(builder.Servers); + _serverQueue = new Queue(_servers); + } + + #region IServerManager Members + + /// + /// Gets if there are any more connections left to try. + /// + public bool HasNext + { + get { return _servers.Count > 0; } + } + + public Server Next() + { + Server server; + + using (TimedLock.Lock(_lock)) + { + server = _serverQueue.Dequeue(); + _serverQueue.Enqueue(server); + } + + return server; + } + + public void Add(Server server) + { + using (TimedLock.Lock(_lock)) + { + _servers.Add(server); + _serverQueue.Enqueue(server); + } + } + + public void Remove(Server server) + { + using (TimedLock.Lock(_lock)) + { + _servers.Remove(server); + _serverQueue = new Queue(_servers); + } + } + + #endregion + + #region IEnumerable Members + + public IEnumerator GetEnumerator() + { + return _servers.GetEnumerator(); + } + + #endregion + + #region IEnumerable Members + + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + #endregion + } +} diff --git a/FluentCassandra/Connection/Server.cs b/FluentCassandra/Connections/Server.cs similarity index 58% rename from FluentCassandra/Connection/Server.cs rename to FluentCassandra/Connections/Server.cs index a45c9fd..9a50890 100644 --- a/FluentCassandra/Connection/Server.cs +++ b/FluentCassandra/Connections/Server.cs @@ -1,24 +1,28 @@ using System; -namespace FluentCassandra +namespace FluentCassandra.Connections { public class Server { public const int DefaultPort = 9160; + public const int DefaultTimeout = 0; - public Server(string host = "127.0.0.1", int port = DefaultPort) + public Server(string host = "127.0.0.1", int port = DefaultPort, int timeout = DefaultTimeout) { Host = host; Port = port; + Timeout = timeout; } public int Port { get; private set; } public string Host { get; private set; } + public int Timeout { get; private set; } + public override string ToString() { - return String.Concat(Host, ":", Port); + return String.Concat(Host, ":", Port, "-", Timeout); } } } diff --git a/FluentCassandra/Connection/TimedLock.cs b/FluentCassandra/Connections/TimedLock.cs similarity index 99% rename from FluentCassandra/Connection/TimedLock.cs rename to FluentCassandra/Connections/TimedLock.cs index bf0294f..7f6d135 100644 --- a/FluentCassandra/Connection/TimedLock.cs +++ b/FluentCassandra/Connections/TimedLock.cs @@ -2,7 +2,7 @@ using System.Threading; using System.Runtime.Serialization; -namespace FluentCassandra +namespace FluentCassandra.Connections { /// /// Thanks to Eric Gunnerson and Phil Haack diff --git a/FluentCassandra/FluentCassandra.csproj b/FluentCassandra/FluentCassandra.csproj index f7c1d87..e1042b1 100644 --- a/FluentCassandra/FluentCassandra.csproj +++ b/FluentCassandra/FluentCassandra.csproj @@ -109,16 +109,18 @@ - - - - - - - - - - + + + + + + + + + + + + Code