Skip to content
This repository has been archived by the owner on May 25, 2021. It is now read-only.

Commit

Permalink
added round robin server manager to connection provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Berardi committed Nov 15, 2010
1 parent 4b8a27e commit b6643b0
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 69 deletions.
1 change: 1 addition & 0 deletions FluentCassandra/CassandraContext.cs
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using FluentCassandra.Types;
using FluentCassandra.Operations;
using FluentCassandra.Connections;

namespace FluentCassandra
{
Expand Down
1 change: 1 addition & 0 deletions FluentCassandra/CassandraKeyspace.cs
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using Apache.Cassandra;
using FluentCassandra.Connections;

namespace FluentCassandra
{
Expand Down
1 change: 1 addition & 0 deletions FluentCassandra/CassandraSession.cs
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using Apache.Cassandra;
using FluentCassandra.Connections;

namespace FluentCassandra
{
Expand Down
Expand Up @@ -3,7 +3,7 @@
using Thrift.Protocol;
using Apache.Cassandra;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
/// <summary>
///
Expand All @@ -22,20 +22,12 @@ public class Connection : IConnection, IDisposable
///
/// </summary>
/// <param name="builder"></param>
internal Connection(Server server, int timeout = 0)
internal Connection(Server server)
{
Created = DateTime.Now;
Server = server;
Timeout = timeout;

//TcpClient client = new TcpClient(server.Host, server.Port);
//client.NoDelay = true;
//client.SendBufferSize = timeout;
//client.ReceiveTimeout = timeout;

//TTransport socket = new TSocket(client);

TTransport socket = new TSocket(server.Host, server.Port, timeout);
TTransport socket = new TSocket(server.Host, server.Port, server.Timeout);

_transport = new TFramedTransport(socket);
_protocol = new TBinaryProtocol(_transport);
Expand All @@ -51,15 +43,6 @@ public DateTime Created
private set;
}

/// <summary>
///
/// </summary>
public int Timeout
{
get;
private set;
}

/// <summary>
///
/// </summary>
Expand Down
Expand Up @@ -3,7 +3,7 @@
using System.Text;
using Apache.Cassandra;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public class ConnectionBuilder
{
Expand Down Expand Up @@ -107,9 +107,9 @@ private void InitializeConnectionString(string connectionString)
{
int port;
if (Int32.TryParse(serverParts[1], out port))
Servers.Add(new Server(host, port));
Servers.Add(new Server(host: host, port: port, timeout: Timeout));
else
Servers.Add(new Server(host));
Servers.Add(new Server(host: host, timeout: Timeout));
}
else
Servers.Add(new Server(host));
Expand Down
@@ -1,6 +1,6 @@
using System;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public abstract class ConnectionProvider : IConnectionProvider
{
Expand All @@ -11,13 +11,19 @@ public abstract class ConnectionProvider : IConnectionProvider
protected ConnectionProvider(ConnectionBuilder builder)
{
Builder = builder;
Servers = new RoundRobinServerManager(builder);
}

/// <summary>
///
/// </summary>
public ConnectionBuilder Builder { get; private set; }

/// <summary>
///
/// </summary>
public IServerManager Servers { get; private set; }

/// <summary>
///
/// </summary>
Expand Down
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public static class ConnectionProviderFactory
{
Expand Down
@@ -1,7 +1,7 @@
using System;
using Apache.Cassandra;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public interface IConnection : IDisposable
{
Expand Down
@@ -1,6 +1,6 @@
using System;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public interface IConnectionProvider
{
Expand Down
14 changes: 14 additions & 0 deletions FluentCassandra/Connections/IServerManager.cs
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;

namespace FluentCassandra.Connections
{
public interface IServerManager : IEnumerable<Server>
{
bool HasNext { get; }
Server Next();

void Add(Server server);
void Remove(Server server);
}
}
Expand Up @@ -2,12 +2,10 @@
using System.Collections.Generic;
using System.Net.Sockets;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public class NormalConnectionProvider : ConnectionProvider
{
private Random _random = new Random();

/// <summary>
///
/// </summary>
Expand All @@ -19,27 +17,13 @@ public NormalConnectionProvider(ConnectionBuilder builder)
throw new CassandraException("You must specify a timeout when using multiple servers.");

Timeout = builder.Timeout;
ActiveServers = builder.Servers;
}

/// <summary>
///
/// </summary>
public int Timeout { get; private set; }

/// <summary>
///
/// </summary>
public IList<Server> ActiveServers { get; private set; }

/// <summary>
/// Gets if there are any more connections left to try.
/// </summary>
public bool HasNext
{
get { return ActiveServers.Count > 0; }
}

/// <summary>
///
/// </summary>
Expand All @@ -48,7 +32,7 @@ public override IConnection Open()
{
IConnection conn = null;

while (HasNext)
while (Servers.HasNext)
{
try
{
Expand All @@ -58,12 +42,9 @@ public override IConnection Open()
}
catch (SocketException exc)
{
using (TimedLock.Lock(ActiveServers))
{
Close(conn);
ActiveServers.Remove(conn.Server);
conn = null;
}
Close(conn);
Servers.Remove(conn.Server);
conn = null;
}
}

Expand All @@ -79,11 +60,11 @@ public override IConnection Open()
/// <returns></returns>
public override IConnection CreateConnection()
{
if (ActiveServers.Count == 0)
if (!Servers.HasNext)
return null;

var server = Builder.Servers[_random.Next(ActiveServers.Count)];
var conn = new Connection(server, Timeout);
var server = Servers.Next();
var conn = new Connection(server);

return conn;
}
Expand Down
Expand Up @@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public class PooledConnectionProvider : NormalConnectionProvider
{
Expand Down
80 changes: 80 additions & 0 deletions FluentCassandra/Connections/RoundRobinServerManager.cs
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;

namespace FluentCassandra.Connections
{
public class RoundRobinServerManager : IServerManager
{
private readonly object _lock = new object();

private List<Server> _servers;
private Queue<Server> _serverQueue;

public RoundRobinServerManager(ConnectionBuilder builder)
{
_servers = new List<Server>(builder.Servers);
_serverQueue = new Queue<Server>(_servers);
}

#region IServerManager Members

/// <summary>
/// Gets if there are any more connections left to try.
/// </summary>
public bool HasNext
{
get { return _servers.Count > 0; }
}

public Server Next()
{
Server server;

using (TimedLock.Lock(_lock))
{
server = _serverQueue.Dequeue();
_serverQueue.Enqueue(server);
}

return server;
}

public void Add(Server server)
{
using (TimedLock.Lock(_lock))
{
_servers.Add(server);
_serverQueue.Enqueue(server);
}
}

public void Remove(Server server)
{
using (TimedLock.Lock(_lock))
{
_servers.Remove(server);
_serverQueue = new Queue<Server>(_servers);
}
}

#endregion

#region IEnumerable<Server> Members

public IEnumerator<Server> GetEnumerator()
{
return _servers.GetEnumerator();
}

#endregion

#region IEnumerable Members

System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

#endregion
}
}
@@ -1,24 +1,28 @@
using System;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
public class Server
{
public const int DefaultPort = 9160;
public const int DefaultTimeout = 0;

public Server(string host = "127.0.0.1", int port = DefaultPort)
public Server(string host = "127.0.0.1", int port = DefaultPort, int timeout = DefaultTimeout)
{
Host = host;
Port = port;
Timeout = timeout;
}

public int Port { get; private set; }

public string Host { get; private set; }

public int Timeout { get; private set; }

public override string ToString()
{
return String.Concat(Host, ":", Port);
return String.Concat(Host, ":", Port, "-", Timeout);
}
}
}
Expand Up @@ -2,7 +2,7 @@
using System.Threading;
using System.Runtime.Serialization;

namespace FluentCassandra
namespace FluentCassandra.Connections
{
/// <summary>
/// Thanks to Eric Gunnerson and Phil Haack
Expand Down

0 comments on commit b6643b0

Please sign in to comment.