Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

151 lines (129 sloc) 3.526 kb
using System;
using System.Collections.Generic;
using System.Threading;
namespace FluentCassandra.Connections
{
public class PooledConnectionProvider : NormalConnectionProvider
{
private readonly object _lock = new object();
private readonly Queue<IConnection> _freeConnections = new Queue<IConnection>();
private readonly HashSet<IConnection> _usedConnections = new HashSet<IConnection>();
private readonly Timer _maintenanceTimer;
/// <summary>
///
/// </summary>
/// <param name="builder"></param>
public PooledConnectionProvider(IConnectionBuilder builder)
: base(builder)
{
MinPoolSize = builder.MinPoolSize;
MaxPoolSize = builder.MaxPoolSize;
ConnectionLifetime = builder.ConnectionLifetime;
_maintenanceTimer = new Timer(o => Cleanup(), null, 30000L, 30000L);
}
/// <summary>
///
/// </summary>
public int MinPoolSize { get; private set; }
/// <summary>
///
/// </summary>
public int MaxPoolSize { get; private set; }
/// <summary>
///
/// </summary>
public TimeSpan ConnectionLifetime { get; private set; }
/// <summary>
///
/// </summary>
/// <returns></returns>
public override IConnection CreateConnection()
{
IConnection conn = null;
lock (_lock)
{
if (_freeConnections.Count > 0)
{
conn = _freeConnections.Dequeue();
_usedConnections.Add(conn);
}
else if (_freeConnections.Count + _usedConnections.Count >= MaxPoolSize)
{
if (!Monitor.Wait(_lock, TimeSpan.FromSeconds(30)))
throw new TimeoutException("No connection could be made, timed out trying to acquirer a connection from the connection pool.");
return CreateConnection();
}
else
{
conn = base.CreateConnection();
_usedConnections.Add(conn);
}
}
return conn;
}
public override void ErrorOccurred(IConnection connection, Exception exc = null)
{
lock (_lock)
{
_usedConnections.RemoveWhere(x => x.Server == connection.Server);
Servers.ErrorOccurred(connection.Server, exc);
var currentFreeConnections = _freeConnections.ToArray();
_freeConnections.Clear();
foreach (var conn in currentFreeConnections)
if (conn.Server != connection.Server)
_freeConnections.Enqueue(conn);
}
}
/// <summary>
///
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
public override bool Close(IConnection connection)
{
lock (_lock)
{
_usedConnections.Remove(connection);
if (IsAlive(connection))
_freeConnections.Enqueue(connection);
}
return true;
}
/// <summary>
/// Cleans up this instance.
/// </summary>
public void Cleanup()
{
CheckFreeConnectionsAlive();
}
/// <summary>
/// Determines whether the connection is alive.
/// </summary>
/// <param name="connection">The connection.</param>
/// <returns>True if alive; otherwise false.</returns>
private bool IsAlive(IConnection connection)
{
if (ConnectionLifetime > TimeSpan.Zero && connection.Created.Add(ConnectionLifetime) < DateTime.UtcNow)
return false;
return connection.IsOpen;
}
/// <summary>
/// The check free connections alive.
/// </summary>
private void CheckFreeConnectionsAlive()
{
lock (_lock)
{
var freeConnections = _freeConnections.ToArray();
_freeConnections.Clear();
foreach (var free in freeConnections)
{
if (IsAlive(free))
_freeConnections.Enqueue(free);
else
base.Close(free);
}
}
}
}
}
Jump to Line
Something went wrong with that request. Please try again.