Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 95 additions & 59 deletions Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using Hazelcast.Security;
using Hazelcast.Util;
using ICredentials = Hazelcast.Security.ICredentials;

#pragma warning disable CS1591
namespace Hazelcast.Client.Spi
{
Expand All @@ -41,7 +42,7 @@ namespace Hazelcast.Client.Spi
/// </summary>
internal class ClientClusterService : IClientClusterService, IConnectionListener, IConnectionHeartbeatListener
{
private static readonly ILogger Logger = Logging.Logger.GetLogger(typeof (ClientClusterService));
private static readonly ILogger Logger = Logging.Logger.GetLogger(typeof(ClientClusterService));
private readonly HazelcastClient _client;

private readonly ConcurrentDictionary<string, IMembershipListener> _listeners =
Expand All @@ -54,12 +55,22 @@ internal class ClientClusterService : IClientClusterService, IConnectionListener
private ClientConnectionManager _connectionManager;
private ICredentials _credentials;
private Address _ownerConnectionAddress;
private Address _prevOwnerConnectionAddress;
private ClientPrincipal _principal;
private readonly int _connectionAttemptPeriod;
private readonly int _connectionAttemptLimit;
private readonly bool _shuffleMemberList;

public ClientClusterService(HazelcastClient client)
{
_client = client;

var networkConfig = GetClientConfig().GetNetworkConfig();
var connAttemptLimit = networkConfig.GetConnectionAttemptLimit();
_connectionAttemptPeriod = networkConfig.GetConnectionAttemptPeriod();
_connectionAttemptLimit = connAttemptLimit == 0 ? int.MaxValue : connAttemptLimit;
_shuffleMemberList = EnvironmentUtil.ReadBool("hazelcast.client.shuffle.member.list") ?? false;

var listenerConfigs = client.GetClientConfig().GetListenerConfigs();
foreach (var listenerConfig in listenerConfigs)
{
Expand Down Expand Up @@ -88,6 +99,16 @@ public ClientClusterService(HazelcastClient client)
}
}

private Address OwnerConnectionAddress
{
get { return _ownerConnectionAddress; }
set
{
_prevOwnerConnectionAddress = _ownerConnectionAddress;
_ownerConnectionAddress = value;
}
}

public IMember GetMember(Address address)
{
var members = _membersRef.Get();
Expand Down Expand Up @@ -127,7 +148,7 @@ public IClient GetLocalClient()
{
var cm = _client.GetConnectionManager();
var cp = GetPrincipal();
var ownerConnection = cm.GetConnection(_ownerConnectionAddress);
var ownerConnection = cm.GetConnection(OwnerConnectionAddress);

var socketAddress = ownerConnection != null ? ownerConnection.GetLocalSocketAddress() : null;
var uuid = cp != null ? cp.GetUuid() : null;
Expand All @@ -136,7 +157,7 @@ public IClient GetLocalClient()

public Address GetOwnerConnectionAddress()
{
return _ownerConnectionAddress;
return OwnerConnectionAddress;
}

public string AddMembershipListener(IMembershipListener listener)
Expand Down Expand Up @@ -172,9 +193,9 @@ public void HeartBeatResumed(ClientConnection connection)

public void HeartBeatStopped(ClientConnection connection)
{
if (connection.GetAddress().Equals(_ownerConnectionAddress))
if (connection.GetAddress().Equals(OwnerConnectionAddress))
{
_connectionManager.DestroyConnection(connection, new TargetDisconnectedException(_ownerConnectionAddress));
_connectionManager.DestroyConnection(connection, new TargetDisconnectedException(OwnerConnectionAddress));
}
}

Expand All @@ -185,7 +206,7 @@ public void ConnectionAdded(ClientConnection connection)
public void ConnectionRemoved(ClientConnection connection)
{
var executionService = (ClientExecutionService) _client.GetClientExecutionService();
if (Equals(connection.GetAddress(), _ownerConnectionAddress))
if (Equals(connection.GetAddress(), OwnerConnectionAddress))
{
if (_client.GetLifecycleService().IsRunning())
{
Expand Down Expand Up @@ -223,10 +244,10 @@ internal int ServerVersion
{
get
{
if (_ownerConnectionAddress != null)
if (OwnerConnectionAddress != null)
{
var cm = _client.GetConnectionManager();
var ownerConnection = cm.GetConnection(_ownerConnectionAddress);
var ownerConnection = cm.GetConnection(OwnerConnectionAddress);
return ownerConnection != null ? ownerConnection.ConnectedServerVersionInt : -1;
}
return -1;
Expand All @@ -246,33 +267,27 @@ internal virtual void FireMemberAttributeEvent(MemberAttributeEvent @event)

internal virtual void FireMembershipEvent(MembershipEvent @event)
{
_client.GetClientExecutionService().Submit(
(() =>
_client.GetClientExecutionService().Submit((() =>
{
foreach (var listener in _listeners.Values)
{
foreach (var listener in _listeners.Values)
if (@event.GetEventType() == MembershipEvent.MemberAdded)
{
if (@event.GetEventType() == MembershipEvent.MemberAdded)
{
listener.MemberAdded(@event);
}
else
{
listener.MemberRemoved(@event);
}
listener.MemberAdded(@event);
}
}));
else
{
listener.MemberRemoved(@event);
}
}
}));
}

internal virtual IDictionary<Address, IMember> GetMembersRef()
{
return _membersRef.Get();
}

internal virtual ISerializationService GetSerializationService()
{
return _client.GetSerializationService();
}

internal virtual string MembersString()
{
var sb = new StringBuilder("\n\nMembers [");
Expand Down Expand Up @@ -304,13 +319,13 @@ private void AddMembershipListenerWithoutInit(IMembershipListener listener)
/// <exception cref="System.Exception" />
private bool Connect(ICollection<IPEndPoint> triedAddresses)
{
ICollection<IPEndPoint> socketAddresses = GetEndpoints();
foreach (var inetSocketAddress in socketAddresses)
var addresses = GetPossibleMemberAddresses();
foreach (var address in addresses)
{
var inetSocketAddress = address.GetInetSocketAddress();
try
{
triedAddresses.Add(inetSocketAddress);
var address = new Address(inetSocketAddress);
if (Logger.IsFinestEnabled())
{
Logger.Finest("Trying to connect to " + address);
Expand All @@ -323,7 +338,7 @@ private bool Connect(ICollection<IPEndPoint> triedAddresses)
ManagerAuthenticator(connection);
}
FireConnectionEvent(LifecycleEvent.LifecycleState.ClientConnected);
_ownerConnectionAddress = connection.GetAddress();
OwnerConnectionAddress = connection.GetAddress();
return true;
}
catch (Exception e)
Expand All @@ -338,21 +353,17 @@ private bool Connect(ICollection<IPEndPoint> triedAddresses)
private void ConnectToCluster()
{
ConnectToOne();
_clientMembershipListener.ListenMembershipEvents(_ownerConnectionAddress);
_clientMembershipListener.ListenMembershipEvents(OwnerConnectionAddress);
//_clientListenerService.TriggerFailedListeners(); //TODO: triggerfailedlisteners
}

private void ConnectToOne()
{
_ownerConnectionAddress = null;
var networkConfig = GetClientConfig().GetNetworkConfig();
var connAttemptLimit = networkConfig.GetConnectionAttemptLimit();
var connectionAttemptPeriod = networkConfig.GetConnectionAttemptPeriod();
var connectionAttemptLimit = connAttemptLimit == 0 ? int.MaxValue : connAttemptLimit;
var shuffleMemberList = EnvironmentUtil.ReadBool("hazelcast.client.shuffle.member.list") ?? false;
OwnerConnectionAddress = null;

var attempt = 0;
ICollection<IPEndPoint> triedAddresses = new HashSet<IPEndPoint>();
while (attempt < connectionAttemptLimit)
while (attempt < _connectionAttemptLimit)
{
if (!_client.GetLifecycleService().IsRunning())
{
Expand All @@ -363,16 +374,15 @@ private void ConnectToOne()
break;
}
attempt++;
var nextTry = Clock.CurrentTimeMillis() + connectionAttemptPeriod;
var nextTry = Clock.CurrentTimeMillis() + _connectionAttemptPeriod;
var isConnected = Connect(triedAddresses);
if (isConnected)
{
return;
}
var remainingTime = nextTry - Clock.CurrentTimeMillis();
Logger.Warning(
string.Format("Unable to get alive cluster connection, try in {0} ms later, attempt {1} of {2}.",
Math.Max(0, remainingTime), attempt, connectionAttemptLimit));
Logger.Warning(string.Format("Unable to get alive cluster connection, try in {0} ms later, attempt {1} of {2}.",
Math.Max(0, remainingTime), attempt, _connectionAttemptLimit));
if (remainingTime > 0)
{
try
Expand All @@ -386,8 +396,7 @@ private void ConnectToOne()
}
}
throw new InvalidOperationException("Unable to connect to any address in the config! " +
"The following addresses were tried:" +
string.Join(", ", triedAddresses));
"The following addresses were tried:" + string.Join(", ", triedAddresses));
}

private void FireConnectionEvent(LifecycleEvent.LifecycleState state)
Expand All @@ -401,25 +410,52 @@ private ClientConfig GetClientConfig()
return _client.GetClientConfig();
}

private ICollection<IPEndPoint> GetConfigAddresses()
private IList<Address> GetPossibleMemberAddresses()
{
IEnumerable<IPEndPoint> socketAddresses = new List<IPEndPoint>();
foreach (var address in _client.GetClientConfig().GetNetworkConfig().GetAddresses())
var memberList = _client.GetClientClusterService().GetMemberList();
var addresses = memberList.Select(member => member.GetAddress()).ToList();

if (_shuffleMemberList)
{
addresses = Shuffle(addresses);
}

var configAddresses = GetConfigAddresses();
if (_shuffleMemberList)
{
var endPoints = AddressHelper.GetSocketAddresses(address);
socketAddresses = socketAddresses.Union(endPoints);
configAddresses = Shuffle(configAddresses);
}
return socketAddresses.ToList();

addresses.AddRange(configAddresses);
if (_prevOwnerConnectionAddress != null)
{
/*
* Previous owner address is moved to last item in set so that client will not try to connect to same one immediately.
* It could be the case that address is removed because it is healthy(it not responding to heartbeat/pings)
* In that case, trying other addresses first to upgrade make more sense.
*/
addresses.Remove(_prevOwnerConnectionAddress);
addresses.Add(_prevOwnerConnectionAddress);
}
return addresses;
}

private IList<IPEndPoint> GetEndpoints()
private IList<Address> GetConfigAddresses()
{
var memberList = _client.GetClientClusterService().GetMemberList();
var endpoints = memberList.Select(member => member.GetSocketAddress());
endpoints = endpoints.Union(GetConfigAddresses());
var configAddresses = _client.GetClientConfig().GetNetworkConfig().GetAddresses();
var possibleAddresses = new List<Address>();

foreach (var cfgAddress in configAddresses)
{
possibleAddresses.AddRange(AddressHelper.GetSocketAddresses(cfgAddress));
}
return possibleAddresses;
}

private List<Address> Shuffle(IList<Address> list)
{
var r = new Random();
return endpoints.OrderBy(x => r.Next()).ToList();
return list.OrderBy(x => r.Next()).ToList();
}

private void Init()
Expand Down Expand Up @@ -462,14 +498,14 @@ private void ManagerAuthenticator(ClientConnection connection)
{
var usernamePasswordCr = (UsernamePasswordCredentials) _credentials;
request = ClientAuthenticationCodec.EncodeRequest(usernamePasswordCr.GetUsername(),
usernamePasswordCr.GetPassword(), uuid, ownerUuid, true,
ClientTypes.Csharp, _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion());
usernamePasswordCr.GetPassword(), uuid, ownerUuid, true, ClientTypes.Csharp,
_client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion());
}
else
{
var data = ss.ToData(_credentials);
request = ClientAuthenticationCustomCodec.EncodeRequest(data, uuid, ownerUuid, false,
ClientTypes.Csharp, _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion());
request = ClientAuthenticationCustomCodec.EncodeRequest(data, uuid, ownerUuid, false, ClientTypes.Csharp,
_client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion());
}

IClientMessage response;
Expand All @@ -492,7 +528,7 @@ private void ManagerAuthenticator(ClientConnection connection)
var member = new Member(result.address, result.ownerUuid);
_principal = new ClientPrincipal(result.uuid, result.ownerUuid);

connection.Member =member;
connection.Member = member;
connection.SetOwner();
connection.ConnectedServerVersionStr = result.serverHazelcastVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#pragma warning disable CS1591
namespace Hazelcast.Client.Spi
{
/// <author>mdogan 5/16/13</author>
public interface IClientClusterService
{
/// <param name="listener">The listener to be registered.</param>
Expand Down
12 changes: 6 additions & 6 deletions Hazelcast.Net/Hazelcast.Util/AddressHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal static class AddressHelper
{
private const int MaxPortTries = 3;

public static ICollection<IPEndPoint> GetPossibleSocketAddresses(IPAddress ipAddress, int port,
public static ICollection<Address> GetPossibleSocketAddresses(IPAddress ipAddress, int port,
string scopedAddress)
{
var portTryCount = 1;
Expand All @@ -34,15 +34,15 @@ public static ICollection<IPEndPoint> GetPossibleSocketAddresses(IPAddress ipAdd
portTryCount = MaxPortTries;
port = 5701;
}
ICollection<IPEndPoint> socketAddresses = new List<IPEndPoint>();
ICollection<Address> socketAddresses = new List<Address>();
if (ipAddress == null)
{
for (var i = 0; i < portTryCount; i++)
{
IPAddress addr;
if (IPAddress.TryParse(scopedAddress, out addr))
{
socketAddresses.Add(new IPEndPoint(addr, port + i));
socketAddresses.Add(new Address(addr, port + i));
}
}
}
Expand All @@ -52,7 +52,7 @@ public static ICollection<IPEndPoint> GetPossibleSocketAddresses(IPAddress ipAdd
{
for (var i = 0; i < portTryCount; i++)
{
socketAddresses.Add(new IPEndPoint(ipAddress, port + i));
socketAddresses.Add(new Address(ipAddress, port + i));
}
}
else
Expand All @@ -62,15 +62,15 @@ public static ICollection<IPEndPoint> GetPossibleSocketAddresses(IPAddress ipAdd
{
for (var i = 0; i < portTryCount; i++)
{
socketAddresses.Add(new IPEndPoint(ipa, port + i));
socketAddresses.Add(new Address(ipa, port + i));
}
}
}
}
return socketAddresses;
}

public static ICollection<IPEndPoint> GetSocketAddresses(string address)
public static ICollection<Address> GetSocketAddresses(string address)
{
var addressHolder = AddressUtil.GetAddressHolder(address, -1);
var scopedAddress = addressHolder.ScopeId != null
Expand Down