From e22c339a739100254fdb8b65d2da7a5cfe93b293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?As=C4=B1m=20Arslan?= Date: Wed, 1 Aug 2018 13:11:44 +0300 Subject: [PATCH] fix connection address shuffling --- .../ClientClusterService.cs | 154 +++++++++++------- .../IClientClusterService.cs | 1 - Hazelcast.Net/Hazelcast.Util/AddressHelper.cs | 12 +- 3 files changed, 101 insertions(+), 66 deletions(-) diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs b/Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs index dcbba8fb43..df54717a82 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs @@ -31,6 +31,7 @@ using Hazelcast.Security; using Hazelcast.Util; using ICredentials = Hazelcast.Security.ICredentials; + #pragma warning disable CS1591 namespace Hazelcast.Client.Spi { @@ -41,7 +42,7 @@ namespace Hazelcast.Client.Spi /// internal class ClientClusterService : IClientClusterService, IConnectionListener, IConnectionHeartbeatListener { - private static readonly ILogger Logger = Logging.Logger.GetLogger(typeof (ClientClusterService)); + private static readonly ILogger Logger = Logging.Logger.GetLogger(typeof(ClientClusterService)); private readonly HazelcastClient _client; private readonly ConcurrentDictionary _listeners = @@ -54,12 +55,22 @@ internal class ClientClusterService : IClientClusterService, IConnectionListener private ClientConnectionManager _connectionManager; private ICredentials _credentials; private Address _ownerConnectionAddress; + private Address _prevOwnerConnectionAddress; private ClientPrincipal _principal; + private readonly int _connectionAttemptPeriod; + private readonly int _connectionAttemptLimit; + private readonly bool _shuffleMemberList; public ClientClusterService(HazelcastClient client) { _client = client; + var networkConfig = GetClientConfig().GetNetworkConfig(); + var connAttemptLimit = networkConfig.GetConnectionAttemptLimit(); + _connectionAttemptPeriod = networkConfig.GetConnectionAttemptPeriod(); + _connectionAttemptLimit = connAttemptLimit == 0 ? int.MaxValue : connAttemptLimit; + _shuffleMemberList = EnvironmentUtil.ReadBool("hazelcast.client.shuffle.member.list") ?? false; + var listenerConfigs = client.GetClientConfig().GetListenerConfigs(); foreach (var listenerConfig in listenerConfigs) { @@ -88,6 +99,16 @@ public ClientClusterService(HazelcastClient client) } } + private Address OwnerConnectionAddress + { + get { return _ownerConnectionAddress; } + set + { + _prevOwnerConnectionAddress = _ownerConnectionAddress; + _ownerConnectionAddress = value; + } + } + public IMember GetMember(Address address) { var members = _membersRef.Get(); @@ -127,7 +148,7 @@ public IClient GetLocalClient() { var cm = _client.GetConnectionManager(); var cp = GetPrincipal(); - var ownerConnection = cm.GetConnection(_ownerConnectionAddress); + var ownerConnection = cm.GetConnection(OwnerConnectionAddress); var socketAddress = ownerConnection != null ? ownerConnection.GetLocalSocketAddress() : null; var uuid = cp != null ? cp.GetUuid() : null; @@ -136,7 +157,7 @@ public IClient GetLocalClient() public Address GetOwnerConnectionAddress() { - return _ownerConnectionAddress; + return OwnerConnectionAddress; } public string AddMembershipListener(IMembershipListener listener) @@ -172,9 +193,9 @@ public void HeartBeatResumed(ClientConnection connection) public void HeartBeatStopped(ClientConnection connection) { - if (connection.GetAddress().Equals(_ownerConnectionAddress)) + if (connection.GetAddress().Equals(OwnerConnectionAddress)) { - _connectionManager.DestroyConnection(connection, new TargetDisconnectedException(_ownerConnectionAddress)); + _connectionManager.DestroyConnection(connection, new TargetDisconnectedException(OwnerConnectionAddress)); } } @@ -185,7 +206,7 @@ public void ConnectionAdded(ClientConnection connection) public void ConnectionRemoved(ClientConnection connection) { var executionService = (ClientExecutionService) _client.GetClientExecutionService(); - if (Equals(connection.GetAddress(), _ownerConnectionAddress)) + if (Equals(connection.GetAddress(), OwnerConnectionAddress)) { if (_client.GetLifecycleService().IsRunning()) { @@ -223,10 +244,10 @@ internal int ServerVersion { get { - if (_ownerConnectionAddress != null) + if (OwnerConnectionAddress != null) { var cm = _client.GetConnectionManager(); - var ownerConnection = cm.GetConnection(_ownerConnectionAddress); + var ownerConnection = cm.GetConnection(OwnerConnectionAddress); return ownerConnection != null ? ownerConnection.ConnectedServerVersionInt : -1; } return -1; @@ -246,21 +267,20 @@ internal virtual void FireMemberAttributeEvent(MemberAttributeEvent @event) internal virtual void FireMembershipEvent(MembershipEvent @event) { - _client.GetClientExecutionService().Submit( - (() => + _client.GetClientExecutionService().Submit((() => + { + foreach (var listener in _listeners.Values) { - foreach (var listener in _listeners.Values) + if (@event.GetEventType() == MembershipEvent.MemberAdded) { - if (@event.GetEventType() == MembershipEvent.MemberAdded) - { - listener.MemberAdded(@event); - } - else - { - listener.MemberRemoved(@event); - } + listener.MemberAdded(@event); } - })); + else + { + listener.MemberRemoved(@event); + } + } + })); } internal virtual IDictionary GetMembersRef() @@ -268,11 +288,6 @@ internal virtual IDictionary GetMembersRef() return _membersRef.Get(); } - internal virtual ISerializationService GetSerializationService() - { - return _client.GetSerializationService(); - } - internal virtual string MembersString() { var sb = new StringBuilder("\n\nMembers ["); @@ -304,13 +319,13 @@ private void AddMembershipListenerWithoutInit(IMembershipListener listener) /// private bool Connect(ICollection triedAddresses) { - ICollection socketAddresses = GetEndpoints(); - foreach (var inetSocketAddress in socketAddresses) + var addresses = GetPossibleMemberAddresses(); + foreach (var address in addresses) { + var inetSocketAddress = address.GetInetSocketAddress(); try { triedAddresses.Add(inetSocketAddress); - var address = new Address(inetSocketAddress); if (Logger.IsFinestEnabled()) { Logger.Finest("Trying to connect to " + address); @@ -323,7 +338,7 @@ private bool Connect(ICollection triedAddresses) ManagerAuthenticator(connection); } FireConnectionEvent(LifecycleEvent.LifecycleState.ClientConnected); - _ownerConnectionAddress = connection.GetAddress(); + OwnerConnectionAddress = connection.GetAddress(); return true; } catch (Exception e) @@ -338,21 +353,17 @@ private bool Connect(ICollection triedAddresses) private void ConnectToCluster() { ConnectToOne(); - _clientMembershipListener.ListenMembershipEvents(_ownerConnectionAddress); + _clientMembershipListener.ListenMembershipEvents(OwnerConnectionAddress); //_clientListenerService.TriggerFailedListeners(); //TODO: triggerfailedlisteners } private void ConnectToOne() { - _ownerConnectionAddress = null; - var networkConfig = GetClientConfig().GetNetworkConfig(); - var connAttemptLimit = networkConfig.GetConnectionAttemptLimit(); - var connectionAttemptPeriod = networkConfig.GetConnectionAttemptPeriod(); - var connectionAttemptLimit = connAttemptLimit == 0 ? int.MaxValue : connAttemptLimit; - var shuffleMemberList = EnvironmentUtil.ReadBool("hazelcast.client.shuffle.member.list") ?? false; + OwnerConnectionAddress = null; + var attempt = 0; ICollection triedAddresses = new HashSet(); - while (attempt < connectionAttemptLimit) + while (attempt < _connectionAttemptLimit) { if (!_client.GetLifecycleService().IsRunning()) { @@ -363,16 +374,15 @@ private void ConnectToOne() break; } attempt++; - var nextTry = Clock.CurrentTimeMillis() + connectionAttemptPeriod; + var nextTry = Clock.CurrentTimeMillis() + _connectionAttemptPeriod; var isConnected = Connect(triedAddresses); if (isConnected) { return; } var remainingTime = nextTry - Clock.CurrentTimeMillis(); - Logger.Warning( - string.Format("Unable to get alive cluster connection, try in {0} ms later, attempt {1} of {2}.", - Math.Max(0, remainingTime), attempt, connectionAttemptLimit)); + Logger.Warning(string.Format("Unable to get alive cluster connection, try in {0} ms later, attempt {1} of {2}.", + Math.Max(0, remainingTime), attempt, _connectionAttemptLimit)); if (remainingTime > 0) { try @@ -386,8 +396,7 @@ private void ConnectToOne() } } throw new InvalidOperationException("Unable to connect to any address in the config! " + - "The following addresses were tried:" + - string.Join(", ", triedAddresses)); + "The following addresses were tried:" + string.Join(", ", triedAddresses)); } private void FireConnectionEvent(LifecycleEvent.LifecycleState state) @@ -401,25 +410,52 @@ private ClientConfig GetClientConfig() return _client.GetClientConfig(); } - private ICollection GetConfigAddresses() + private IList
GetPossibleMemberAddresses() { - IEnumerable socketAddresses = new List(); - foreach (var address in _client.GetClientConfig().GetNetworkConfig().GetAddresses()) + var memberList = _client.GetClientClusterService().GetMemberList(); + var addresses = memberList.Select(member => member.GetAddress()).ToList(); + + if (_shuffleMemberList) + { + addresses = Shuffle(addresses); + } + + var configAddresses = GetConfigAddresses(); + if (_shuffleMemberList) { - var endPoints = AddressHelper.GetSocketAddresses(address); - socketAddresses = socketAddresses.Union(endPoints); + configAddresses = Shuffle(configAddresses); } - return socketAddresses.ToList(); + + addresses.AddRange(configAddresses); + if (_prevOwnerConnectionAddress != null) + { + /* + * Previous owner address is moved to last item in set so that client will not try to connect to same one immediately. + * It could be the case that address is removed because it is healthy(it not responding to heartbeat/pings) + * In that case, trying other addresses first to upgrade make more sense. + */ + addresses.Remove(_prevOwnerConnectionAddress); + addresses.Add(_prevOwnerConnectionAddress); + } + return addresses; } - private IList GetEndpoints() + private IList
GetConfigAddresses() { - var memberList = _client.GetClientClusterService().GetMemberList(); - var endpoints = memberList.Select(member => member.GetSocketAddress()); - endpoints = endpoints.Union(GetConfigAddresses()); + var configAddresses = _client.GetClientConfig().GetNetworkConfig().GetAddresses(); + var possibleAddresses = new List
(); + foreach (var cfgAddress in configAddresses) + { + possibleAddresses.AddRange(AddressHelper.GetSocketAddresses(cfgAddress)); + } + return possibleAddresses; + } + + private List
Shuffle(IList
list) + { var r = new Random(); - return endpoints.OrderBy(x => r.Next()).ToList(); + return list.OrderBy(x => r.Next()).ToList(); } private void Init() @@ -462,14 +498,14 @@ private void ManagerAuthenticator(ClientConnection connection) { var usernamePasswordCr = (UsernamePasswordCredentials) _credentials; request = ClientAuthenticationCodec.EncodeRequest(usernamePasswordCr.GetUsername(), - usernamePasswordCr.GetPassword(), uuid, ownerUuid, true, - ClientTypes.Csharp, _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion()); + usernamePasswordCr.GetPassword(), uuid, ownerUuid, true, ClientTypes.Csharp, + _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion()); } else { var data = ss.ToData(_credentials); - request = ClientAuthenticationCustomCodec.EncodeRequest(data, uuid, ownerUuid, false, - ClientTypes.Csharp, _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion()); + request = ClientAuthenticationCustomCodec.EncodeRequest(data, uuid, ownerUuid, false, ClientTypes.Csharp, + _client.GetSerializationService().GetVersion(), VersionUtil.GetDllVersion()); } IClientMessage response; @@ -492,7 +528,7 @@ private void ManagerAuthenticator(ClientConnection connection) var member = new Member(result.address, result.ownerUuid); _principal = new ClientPrincipal(result.uuid, result.ownerUuid); - connection.Member =member; + connection.Member = member; connection.SetOwner(); connection.ConnectedServerVersionStr = result.serverHazelcastVersion; } diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/IClientClusterService.cs b/Hazelcast.Net/Hazelcast.Client.Spi/IClientClusterService.cs index 0f9a90bcd3..7d4ccaa2c2 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/IClientClusterService.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/IClientClusterService.cs @@ -19,7 +19,6 @@ #pragma warning disable CS1591 namespace Hazelcast.Client.Spi { - /// mdogan 5/16/13 public interface IClientClusterService { /// The listener to be registered. diff --git a/Hazelcast.Net/Hazelcast.Util/AddressHelper.cs b/Hazelcast.Net/Hazelcast.Util/AddressHelper.cs index eecf3bef75..6cbec85bcc 100644 --- a/Hazelcast.Net/Hazelcast.Util/AddressHelper.cs +++ b/Hazelcast.Net/Hazelcast.Util/AddressHelper.cs @@ -25,7 +25,7 @@ internal static class AddressHelper { private const int MaxPortTries = 3; - public static ICollection GetPossibleSocketAddresses(IPAddress ipAddress, int port, + public static ICollection
GetPossibleSocketAddresses(IPAddress ipAddress, int port, string scopedAddress) { var portTryCount = 1; @@ -34,7 +34,7 @@ public static ICollection GetPossibleSocketAddresses(IPAddress ipAdd portTryCount = MaxPortTries; port = 5701; } - ICollection socketAddresses = new List(); + ICollection
socketAddresses = new List
(); if (ipAddress == null) { for (var i = 0; i < portTryCount; i++) @@ -42,7 +42,7 @@ public static ICollection GetPossibleSocketAddresses(IPAddress ipAdd IPAddress addr; if (IPAddress.TryParse(scopedAddress, out addr)) { - socketAddresses.Add(new IPEndPoint(addr, port + i)); + socketAddresses.Add(new Address(addr, port + i)); } } } @@ -52,7 +52,7 @@ public static ICollection GetPossibleSocketAddresses(IPAddress ipAdd { for (var i = 0; i < portTryCount; i++) { - socketAddresses.Add(new IPEndPoint(ipAddress, port + i)); + socketAddresses.Add(new Address(ipAddress, port + i)); } } else @@ -62,7 +62,7 @@ public static ICollection GetPossibleSocketAddresses(IPAddress ipAdd { for (var i = 0; i < portTryCount; i++) { - socketAddresses.Add(new IPEndPoint(ipa, port + i)); + socketAddresses.Add(new Address(ipa, port + i)); } } } @@ -70,7 +70,7 @@ public static ICollection GetPossibleSocketAddresses(IPAddress ipAdd return socketAddresses; } - public static ICollection GetSocketAddresses(string address) + public static ICollection
GetSocketAddresses(string address) { var addressHolder = AddressUtil.GetAddressHolder(address, -1); var scopedAddress = addressHolder.ScopeId != null