Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

140 lines (113 sloc) 2.669 kB
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace FluentCassandra.Connections
{
public class RoundRobinServerManager : IServerManager
{
private readonly object _lock = new object();
private List<Server> _servers;
private Queue<Server> _serverQueue;
private HashSet<Server> _blackListed;
private Timer _recoveryTimer;
public RoundRobinServerManager(IConnectionBuilder builder)
{
_servers = new List<Server>(builder.Servers);
_serverQueue = new Queue<Server>(_servers);
_blackListed = new HashSet<Server>();
_recoveryTimer = new Timer(o => ServerRecover(), null, 30000L, 30000L);
}
private void ServerRecover()
{
if (_blackListed.Count == 0)
return;
var clonedBlackList = (HashSet<Server>)null;
lock (_lock)
clonedBlackList = new HashSet<Server>(this._blackListed);
foreach (var server in clonedBlackList)
{
var connection = new Connection(server, ConnectionType.Simple, 1024);
try
{
connection.Open();
lock (_lock)
_blackListed.Remove(server);
}
catch { }
finally
{
connection.Close();
}
}
}
#region IServerManager Members
public bool HasNext
{
get { lock (_lock) { return _serverQueue.Count > 0; } }
}
public Server Next()
{
Server server = null;
lock (_lock)
{
if (_serverQueue.Count > 0)
{
server = _serverQueue.Dequeue();
_serverQueue.Enqueue(server);
}
}
return server;
}
public void Add(Server server)
{
lock (_lock)
{
_servers.Add(server);
_serverQueue.Enqueue(server);
}
}
public void ErrorOccurred(Server server, Exception exc = null)
{
Debug.WriteLineIf(exc != null, exc, "connection");
Debug.WriteLine(server + " has been blacklisted", "connection");
lock (_lock)
{
if (_blackListed.Add(server))
RefreshServerQueue();
}
}
public void Remove(Server server)
{
Debug.WriteLine(server + " has been removed", "connection");
lock (_lock)
{
_servers.Remove(server);
_blackListed.RemoveWhere(x => x == server);
RefreshServerQueue();
}
}
private void RefreshServerQueue()
{
_serverQueue.Clear();
foreach (var s in _servers)
{
if (!_blackListed.Contains(s))
_serverQueue.Enqueue(s);
}
}
#endregion
#region IEnumerable<Server> Members
public IEnumerator<Server> GetEnumerator()
{
return _servers.GetEnumerator();
}
#endregion
#region IEnumerable Members
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
#endregion
}
}
Jump to Line
Something went wrong with that request. Please try again.