Skip to content

Commit

Permalink
Implement server load balancing. Fixes mysql-net#226
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Nov 7, 2017
1 parent a06bf08 commit ded586c
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 16 deletions.
16 changes: 16 additions & 0 deletions docs/content/connection-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ These are the other options that MySqlConnector supports. They are set to sensi
On Windows, a value greater than 0 is the idle connection time, measured in seconds, before the first keepalive packet is sent.
Due to limitations in .NET Core, Unix-based Operating Systems will always use the OS Default keepalive settings.</td>
</tr>
<tr>
<td>Load Balance, LoadBalance</td>
<td>RoundRobin</td>
<td><p>The load-balancing strategy to use when <code>Host</code> contains multiple, comma-delimited, host names.
The options include:</p>
<dl>
<dt>RoundRobin</dt>
<dd>Each new connection opened for this connection pool uses the next host name (sequentially with wraparound). Requires <code>Pooling=True</code>. This is the default if <code>Pooling=True</code>.</dd>
<dt>InOrder</dt>
<dd>Each new connection tries the hosts in order, starting with the first one. This is the default if <code>Pooling=False</code>.</dd>
<dt>Random</dt>
<dd>Servers are tried in a random order.</dd>
<dt>FewestConnections</dt>
<dd>Servers are tried in ascending order of number of currently-open connections in this connection pool. Requires <code>Pooling=True</code>.
</dl>
</tr>
<tr>
<td>Old Guids, OldGuids</td>
<td>false</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/content/tutorials/migrating-from-connector-net.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ MySqlConnector has some different default connection string options:
<td>Default is <code>false</code></td>
<td>MySqlConnector takes an extra command to reset pooled connections by default so that the connection is always in a known state</td>
</tr>
<tr>
<td><code>LoadBalance</code></td>
<td>Default is <code>RoundRobin</code></td>
<td>(not configurable, effective default is <code>InOrder</code>)</td>
<td>Connector/NET currently has [a bug](https://bugs.mysql.com/bug.php?id=81650) that prevents multiple host names being used.</td>
</tr>
<tr>
<td><code>ServerRSAPublicKeyFile</code></td>
<td>(no default)</td>
Expand Down
48 changes: 46 additions & 2 deletions src/MySqlConnector/MySqlClient/ConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.Protocol.Serialization;
Expand Down Expand Up @@ -64,6 +65,7 @@ public async Task<MySqlSession> GetSessionAsync(MySqlConnection connection, IOBe
if (!reuseSession)
{
// session is either old or cannot communicate with the server
AdjustHostConnectionCount(session, -1);
await session.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
else
Expand All @@ -78,7 +80,8 @@ public async Task<MySqlSession> GetSessionAsync(MySqlConnection connection, IOBe

// create a new session
session = new MySqlSession(this, m_generation, Interlocked.Increment(ref m_lastId));
await session.ConnectAsync(m_connectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false);
await session.ConnectAsync(m_connectionSettings, m_loadBalancer, ioBehavior, cancellationToken).ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
session.OwningConnection = new WeakReference<MySqlConnection>(connection);
lock (m_leasedSessions)
m_leasedSessions.Add(session.Id, session);
Expand Down Expand Up @@ -114,10 +117,15 @@ public void Return(MySqlSession session)
m_leasedSessions.Remove(session.Id);
session.OwningConnection = null;
if (SessionIsHealthy(session))
{
lock (m_sessions)
m_sessions.AddFirst(session);
}
else
{
AdjustHostConnectionCount(session, -1);
session.DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}
}
finally
{
Expand Down Expand Up @@ -260,7 +268,8 @@ private async Task CreateMinimumPooledSessions(IOBehavior ioBehavior, Cancellati
try
{
var session = new MySqlSession(this, m_generation, Interlocked.Increment(ref m_lastId));
await session.ConnectAsync(m_connectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false);
await session.ConnectAsync(m_connectionSettings, m_loadBalancer, ioBehavior, cancellationToken).ConfigureAwait(false);
AdjustHostConnectionCount(session, 1);
lock (m_sessions)
m_sessions.AddFirst(session);
}
Expand Down Expand Up @@ -306,6 +315,39 @@ private ConnectionPool(ConnectionSettings cs)
m_sessionSemaphore = new SemaphoreSlim(cs.MaximumPoolSize);
m_sessions = new LinkedList<MySqlSession>();
m_leasedSessions = new Dictionary<int, MySqlSession>();
if (cs.LoadBalance == MySqlLoadBalance.FewestConnections)
{
m_hostSessions = new Dictionary<string, int>();
foreach (var hostName in cs.Hostnames)
m_hostSessions[hostName] = 0;
}
m_loadBalancer = cs.ConnectionType != ConnectionType.Tcp ? null :
cs.Hostnames.Count == 1 || cs.LoadBalance == MySqlLoadBalance.InOrder ? InOrderLoadBalancer.Instance :
cs.LoadBalance == MySqlLoadBalance.Random ? RandomLoadBalancer.Instance :
cs.LoadBalance == MySqlLoadBalance.FewestConnections ? new FewestConnectionsLoadBalancer(this) :
(ILoadBalancer) new RoundRobinLoadBalancer();
}

private void AdjustHostConnectionCount(MySqlSession session, int delta)
{
if (m_hostSessions != null)
{
lock (m_hostSessions)
m_hostSessions[session.HostName] += delta;
}
}

private sealed class FewestConnectionsLoadBalancer : ILoadBalancer
{
public FewestConnectionsLoadBalancer(ConnectionPool pool) => m_pool = pool;

public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts)
{
lock (m_pool.m_hostSessions)
return m_pool.m_hostSessions.OrderBy(x => x.Value).Select(x => x.Key).ToList();
}

readonly ConnectionPool m_pool;
}

static readonly ConcurrentDictionary<string, ConnectionPool> s_pools = new ConcurrentDictionary<string, ConnectionPool>();
Expand Down Expand Up @@ -336,6 +378,8 @@ private ConnectionPool(ConnectionSettings cs)
readonly LinkedList<MySqlSession> m_sessions;
readonly ConnectionSettings m_connectionSettings;
readonly Dictionary<int, MySqlSession> m_leasedSessions;
readonly ILoadBalancer m_loadBalancer;
readonly Dictionary<string, int> m_hostSessions;
int m_lastId;
uint m_lastRecoveryTime;
}
Expand Down
6 changes: 5 additions & 1 deletion src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,12 @@ private async Task<MySqlSession> CreateSessionAsync(IOBehavior ioBehavior, Cance
}
else
{
// only "in order" and "random" load balancers supported without connection pooling
var loadBalancer = m_connectionSettings.LoadBalance == MySqlLoadBalance.Random && m_connectionSettings.Hostnames.Count > 1 ?
RandomLoadBalancer.Instance : InOrderLoadBalancer.Instance;

var session = new MySqlSession();
await session.ConnectAsync(m_connectionSettings, ioBehavior, linkedSource.Token).ConfigureAwait(false);
await session.ConnectAsync(m_connectionSettings, loadBalancer, ioBehavior, linkedSource.Token).ConfigureAwait(false);
return session;
}
}
Expand Down
21 changes: 21 additions & 0 deletions src/MySqlConnector/MySqlClient/MySqlConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public string Database
set => MySqlConnectionStringOption.Database.SetValue(this, value);
}

public MySqlLoadBalance LoadBalance
{
get => MySqlConnectionStringOption.LoadBalance.GetValue(this);
set => MySqlConnectionStringOption.LoadBalance.SetValue(this, value);
}

// SSL/TLS Options
public MySqlSslMode SslMode
{
Expand Down Expand Up @@ -251,6 +257,7 @@ internal abstract class MySqlConnectionStringOption
public static readonly MySqlConnectionStringOption<string> UserID;
public static readonly MySqlConnectionStringOption<string> Password;
public static readonly MySqlConnectionStringOption<string> Database;
public static readonly MySqlConnectionStringOption<MySqlLoadBalance> LoadBalance;

// SSL/TLS Options
public static readonly MySqlConnectionStringOption<MySqlSslMode> SslMode;
Expand Down Expand Up @@ -330,6 +337,10 @@ static MySqlConnectionStringOption()
keys: new[] { "Database", "Initial Catalog" },
defaultValue: ""));

AddOption(LoadBalance = new MySqlConnectionStringOption<MySqlLoadBalance>(
keys: new[] { "LoadBalance", "Load Balance" },
defaultValue: MySqlLoadBalance.RoundRobin));

// SSL/TLS Options
AddOption(SslMode = new MySqlConnectionStringOption<MySqlSslMode>(
keys: new[] { "SSL Mode", "SslMode" },
Expand Down Expand Up @@ -471,6 +482,16 @@ private static T ChangeType(object objectValue)
return (T) (object) false;
}

if (typeof(T) == typeof(MySqlLoadBalance) && objectValue is string loadBalanceString)
{
foreach (var val in Enum.GetValues(typeof(T)))
{
if (string.Equals(loadBalanceString, val.ToString(), StringComparison.OrdinalIgnoreCase))
return (T) val;
}
throw new InvalidOperationException("Value '{0}' not supported for option '{1}'.".FormatInvariant(objectValue, typeof(T).Name));
}

if (typeof(T) == typeof(MySqlSslMode) && objectValue is string sslModeString)
{
foreach (var val in Enum.GetValues(typeof(T)))
Expand Down
25 changes: 25 additions & 0 deletions src/MySqlConnector/MySqlClient/MySqlLoadBalance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace MySql.Data.MySqlClient
{
public enum MySqlLoadBalance
{
/// <summary>
/// Servers are tried sequentially, across multiple calls to <see cref="MySqlConnection.Open"/>.
/// </summary>
RoundRobin,

/// <summary>
/// Servers are tried in order, starting with the first one, for each call to <see cref="MySqlConnection.Open"/>.
/// </summary>
InOrder,

/// <summary>
/// Servers are tried in random order.
/// </summary>
Random,

/// <summary>
/// Servers are tried in ascending order of number of currently-open connections.
/// </summary>
FewestConnections,
}
}
5 changes: 3 additions & 2 deletions src/MySqlConnector/Serialization/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public ConnectionSettings(MySqlConnectionStringBuilder csb)
{
ConnectionType = ConnectionType.Tcp;
Hostnames = csb.Server.Split(',');
LoadBalance = csb.LoadBalance;
Port = (int) csb.Port;
}
UserID = csb.UserID;
Expand Down Expand Up @@ -65,7 +66,8 @@ public ConnectionSettings(MySqlConnectionStringBuilder csb)
// Base Options
public string ConnectionString { get; }
public ConnectionType ConnectionType { get; }
public IEnumerable<string> Hostnames { get; }
public IReadOnlyList<string> Hostnames { get; }
public MySqlLoadBalance LoadBalance { get; }
public int Port { get; }
public string UnixSocket { get; }
public string UserID { get; }
Expand Down Expand Up @@ -125,6 +127,5 @@ public int ConnectionTimeoutMilliseconds
return m_connectionTimeoutMilliseconds.Value;
}
}

}
}
75 changes: 75 additions & 0 deletions src/MySqlConnector/Serialization/ILoadBalancer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;

namespace MySql.Data.Serialization
{
internal interface ILoadBalancer
{
/// <summary>
/// Returns an <see cref="IEnumerable{string}"/> containing <paramref name="hosts"/> in the order they
/// should be tried to satisfy the load balancing policy.
/// </summary>
IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts);
}

internal sealed class InOrderLoadBalancer : ILoadBalancer
{
public static ILoadBalancer Instance { get; } = new InOrderLoadBalancer();

public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts) => hosts;

private InOrderLoadBalancer()
{
}
}

internal sealed class RandomLoadBalancer : ILoadBalancer
{
public static ILoadBalancer Instance { get; } = new RandomLoadBalancer();

public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts)
{
// from https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm
var shuffled = new List<string>(hosts);
for (var i = hosts.Count - 1; i >= 1; i--)
{
int j;
lock (m_random)
j = m_random.Next(i + 1);
if (i != j)
{
var temp = shuffled[i];
shuffled[i] = shuffled[j];
shuffled[j] = temp;
}
}
return shuffled;
}

private RandomLoadBalancer() => m_random = new Random();

readonly Random m_random;
}

internal sealed class RoundRobinLoadBalancer : ILoadBalancer
{
public RoundRobinLoadBalancer() => m_lock = new object();

public IEnumerable<string> LoadBalance(IReadOnlyList<string> hosts)
{
int start;
lock (m_lock)
start = (int) (m_counter++ % hosts.Count);

var shuffled = new List<string>(hosts.Count);
for (var i = start; i < hosts.Count; i++)
shuffled.Add(hosts[i]);
for (var i = 0; i < start; i++)
shuffled.Add(hosts[i]);
return shuffled;
}

readonly object m_lock;
uint m_counter;
}
}
Loading

0 comments on commit ded586c

Please sign in to comment.