diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterAddCodec.cs b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterAddCodec.cs new file mode 100644 index 0000000000..53214744b2 --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterAddCodec.cs @@ -0,0 +1,108 @@ +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Hazelcast.Client.Protocol; +using Hazelcast.Client.Protocol.Util; +using Hazelcast.IO; +using Hazelcast.Logging; +using Hazelcast.IO.Serialization; + +// Client Protocol version, Since:1.6 - Update:1.6 +namespace Hazelcast.Client.Protocol.Codec +{ + internal static class PNCounterAddCodec + { + private static int CalculateRequestDataSize(string name, long delta, bool getBeforeUpdate, IList> replicaTimestamps, Address targetReplica) + { + var dataSize = ClientMessage.HeaderSize; + + dataSize += ParameterUtil.CalculateDataSize(name); + dataSize += Bits.LongSizeInBytes; + dataSize += Bits.BooleanSizeInBytes; + dataSize += Bits.IntSizeInBytes; + + foreach (var replicaTimestampsItem in replicaTimestamps ) + { + var replicaTimestampsItemKey = replicaTimestampsItem.Key; + var replicaTimestampsItemVal = replicaTimestampsItem.Value; + + dataSize += ParameterUtil.CalculateDataSize(replicaTimestampsItemKey); + dataSize += ParameterUtil.CalculateDataSize(replicaTimestampsItemVal); + } + + dataSize += AddressCodec.CalculateDataSize(targetReplica); + return dataSize; + } + + internal static ClientMessage EncodeRequest(string name, long delta, bool getBeforeUpdate, IList> replicaTimestamps, Address targetReplica) + { + var requiredDataSize = CalculateRequestDataSize(name, delta, getBeforeUpdate, replicaTimestamps, targetReplica); + var clientMessage = ClientMessage.CreateForEncode(requiredDataSize); + + clientMessage.SetMessageType((int)PNCounterMessageType.PNCounterAdd); + clientMessage.SetRetryable(false); + clientMessage.Set(name); + clientMessage.Set(delta); + clientMessage.Set(getBeforeUpdate); + clientMessage.Set(replicaTimestamps.Count); + + foreach (var replicaTimestampsItem in replicaTimestamps) + { + var replicaTimestampsItemKey = replicaTimestampsItem.Key; + var replicaTimestampsItemVal = replicaTimestampsItem.Value; + + clientMessage.Set(replicaTimestampsItemKey); + clientMessage.Set(replicaTimestampsItemVal); + } + + AddressCodec.Encode(targetReplica, clientMessage); + clientMessage.UpdateFrameLength(); + + return clientMessage; + } + + internal class ResponseParameters + { + public long value; + public IList> replicaTimestamps; + public int replicaCount; + } + + internal static ResponseParameters DecodeResponse(IClientMessage clientMessage) + { + var parameters = new ResponseParameters(); + var value = clientMessage.GetLong(); + parameters.value = value; + + var replicaTimestampsSize = clientMessage.GetInt(); + var replicaTimestamps = new List>(replicaTimestampsSize); + + for (var replicaTimestampsIndex = 0; replicaTimestampsIndex(replicaTimestampsItemKey, replicaTimestampsItemVal); + + replicaTimestamps.Add(replicaTimestampsItem); + } + + parameters.replicaTimestamps = replicaTimestamps; + var replicaCount = clientMessage.GetInt(); + parameters.replicaCount = replicaCount; + return parameters; + } + } +} diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetCodec.cs b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetCodec.cs new file mode 100644 index 0000000000..7ed4c96cb7 --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetCodec.cs @@ -0,0 +1,92 @@ +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Hazelcast.Client.Protocol; +using Hazelcast.Client.Protocol.Util; +using Hazelcast.IO; +using Hazelcast.Logging; +using Hazelcast.IO.Serialization; + +// Client Protocol version, Since:1.6 - Update:1.6 +namespace Hazelcast.Client.Protocol.Codec +{ + internal static class PNCounterGetCodec + { + private static int CalculateRequestDataSize(string name, IList> replicaTimestamps, Address targetReplica) + { + var dataSize = ClientMessage.HeaderSize; + dataSize += ParameterUtil.CalculateDataSize(name); + dataSize += Bits.IntSizeInBytes; + foreach (var replicaTimestampsItem in replicaTimestamps ) + { + var replicaTimestampsItemKey = replicaTimestampsItem.Key; + var replicaTimestampsItemVal = replicaTimestampsItem.Value; + dataSize += ParameterUtil.CalculateDataSize(replicaTimestampsItemKey); + dataSize += ParameterUtil.CalculateDataSize(replicaTimestampsItemVal); + } + dataSize += AddressCodec.CalculateDataSize(targetReplica); + return dataSize; + } + + internal static ClientMessage EncodeRequest(string name, IList> replicaTimestamps, Address targetReplica) + { + var requiredDataSize = CalculateRequestDataSize(name, replicaTimestamps, targetReplica); + var clientMessage = ClientMessage.CreateForEncode(requiredDataSize); + clientMessage.SetMessageType((int)PNCounterMessageType.PNCounterGet); + clientMessage.SetRetryable(true); + clientMessage.Set(name); + clientMessage.Set(replicaTimestamps.Count); + foreach (var replicaTimestampsItem in replicaTimestamps) + { + var replicaTimestampsItemKey = replicaTimestampsItem.Key; + var replicaTimestampsItemVal = replicaTimestampsItem.Value; + clientMessage.Set(replicaTimestampsItemKey); + clientMessage.Set(replicaTimestampsItemVal); + } + AddressCodec.Encode(targetReplica, clientMessage); + clientMessage.UpdateFrameLength(); + return clientMessage; + } + + internal class ResponseParameters + { + public long value; + public IList> replicaTimestamps; + public int replicaCount; + } + + internal static ResponseParameters DecodeResponse(IClientMessage clientMessage) + { + var parameters = new ResponseParameters(); + var value = clientMessage.GetLong(); + parameters.value = value; + var replicaTimestampsSize = clientMessage.GetInt(); + var replicaTimestamps = new List>(replicaTimestampsSize); + for (var replicaTimestampsIndex = 0; replicaTimestampsIndex(replicaTimestampsItemKey, replicaTimestampsItemVal); + replicaTimestamps.Add(replicaTimestampsItem); + } + parameters.replicaTimestamps = replicaTimestamps; + var replicaCount = clientMessage.GetInt(); + parameters.replicaCount = replicaCount; + return parameters; + } + + } +} diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetConfiguredReplicaCountCodec.cs b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetConfiguredReplicaCountCodec.cs new file mode 100644 index 0000000000..f6ea1a9d08 --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterGetConfiguredReplicaCountCodec.cs @@ -0,0 +1,60 @@ +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Hazelcast.Client.Protocol; +using Hazelcast.Client.Protocol.Util; +using Hazelcast.IO; +using Hazelcast.Logging; +using Hazelcast.IO.Serialization; + +// Client Protocol version, Since:1.6 - Update:1.6 +namespace Hazelcast.Client.Protocol.Codec +{ + internal static class PNCounterGetConfiguredReplicaCountCodec + { + private static int CalculateRequestDataSize(string name) + { + var dataSize = ClientMessage.HeaderSize; + dataSize += ParameterUtil.CalculateDataSize(name); + return dataSize; + } + + internal static ClientMessage EncodeRequest(string name) + { + var requiredDataSize = CalculateRequestDataSize(name); + var clientMessage = ClientMessage.CreateForEncode(requiredDataSize); + clientMessage.SetMessageType((int)PNCounterMessageType.PNCounterGetConfiguredReplicaCount); + clientMessage.SetRetryable(true); + clientMessage.Set(name); + clientMessage.UpdateFrameLength(); + return clientMessage; + } + + internal class ResponseParameters + { + public int response; + } + + internal static ResponseParameters DecodeResponse(IClientMessage clientMessage) + { + var parameters = new ResponseParameters(); + var response = clientMessage.GetInt(); + parameters.response = response; + return parameters; + } + + } +} diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterMessageType.cs b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterMessageType.cs new file mode 100644 index 0000000000..3d76011c68 --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Client.Protocol.Codec/PNCounterMessageType.cs @@ -0,0 +1,28 @@ +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Hazelcast.Client.Protocol.Codec +{ + internal enum PNCounterMessageType + { + + PNCounterGet = 0x2001, + PNCounterAdd = 0x2002, + PNCounterGetConfiguredReplicaCount = 0x2003 + + } + +} + + diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol.Util/ParameterUtil.cs b/Hazelcast.Net/Hazelcast.Client.Protocol.Util/ParameterUtil.cs index b785321b0a..b649239b82 100644 --- a/Hazelcast.Net/Hazelcast.Client.Protocol.Util/ParameterUtil.cs +++ b/Hazelcast.Net/Hazelcast.Client.Protocol.Util/ParameterUtil.cs @@ -51,6 +51,11 @@ public static int CalculateDataSize(int data) return Bits.IntSizeInBytes; } + public static int CalculateDataSize(long data) + { + return Bits.LongSizeInBytes; + } + public static int CalculateDataSize(bool data) { return Bits.BooleanSizeInBytes; diff --git a/Hazelcast.Net/Hazelcast.Client.Protocol/ClientProtocolErrorCodes.cs b/Hazelcast.Net/Hazelcast.Client.Protocol/ClientProtocolErrorCodes.cs index 11609280b6..b5f8a2af7b 100644 --- a/Hazelcast.Net/Hazelcast.Client.Protocol/ClientProtocolErrorCodes.cs +++ b/Hazelcast.Net/Hazelcast.Client.Protocol/ClientProtocolErrorCodes.cs @@ -91,6 +91,24 @@ internal enum ClientProtocolErrorCodes Xa = 67, AccessControl = 68, Login = 69, - UnsupportedCallback = 70 + UnsupportedCallback = 70, + NoDataMemeber = 71, + ReplicatedMapCantBeCreated = 72, + MaxMessageSizeExceeded = 73, + WANReplicationQueueFull = 74, + AssertionError = 75, + OutOfMemory = 76, + StackOverflowError = 77, + NativeOutOfMemoryError = 78, + ServiceNotFound = 79, + StaleTaskId = 80, + DuplicateTask = 81, + StaleTask = 82, + LocalMemberReset = 83, + IndeterminateOperationState = 84, + FlakeIdNodeIdOutOfRangeException = 85, + TargetNotReplicaException = 86, + MutationDisallowedException = 87, + ConsistencyLostException = 88 } } \ No newline at end of file diff --git a/Hazelcast.Net/Hazelcast.Client.Proxy/ClientPNCounterProxy.cs b/Hazelcast.Net/Hazelcast.Client.Proxy/ClientPNCounterProxy.cs new file mode 100644 index 0000000000..907277d72d --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Client.Proxy/ClientPNCounterProxy.cs @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Hazelcast.Client.Protocol.Codec; +using Hazelcast.Client.Spi; +using Hazelcast.Core; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Hazelcast.IO; +using Hazelcast.Client.Protocol; +using Hazelcast.Logging; + +using TimeStampIList = System.Collections.Generic.IList>; + +namespace Hazelcast.Client.Proxy +{ + /// + /// Client proxy implementation for a . + /// + internal class ClientPNCounterProxy : ClientProxy, IPNCounter + { + internal static readonly HashSet
_emptyAddressList = new HashSet
(); + internal volatile Address _currentTargetReplicaAddress; + private volatile int _maxConfiguredReplicaCount; + + // Sync object to protect _currentTargetReplicaAddress against race conditions + private readonly object _targetAddressGuard = new object(); + + // The last vector clock observed by this proxy. It is used for maintaining + // session consistency guarantees when reading from different replicas. + internal volatile VectorClock _observedClock; + + /// + /// Creates a client proxy + /// + /// the service name + /// the PNCounter name + public ClientPNCounterProxy(string serviceName, string objectId) : base(serviceName, objectId) + { + _observedClock = new VectorClock(); + } + + public override string ToString() + { + return "PNCounter{name='" + GetName() + "\'}"; + } + + public long Get() + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeGetInternal(_emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterGetCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long GetAndAdd(long delta) + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(delta, true, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long AddAndGet(long delta) + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(delta, false, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long GetAndSubtract(long delta) + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(-delta, true, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long SubtractAndGet(long delta) + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(-delta, false, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long DecrementAndGet() + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(-1, false, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long IncrementAndGet() + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(1, false, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long GetAndDecrement() + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(-1, true, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public long GetAndIncrement() + { + var targetAddress = GetCRDTOperationTarget(_emptyAddressList); + var response = InvokeAddInternal(1, true, _emptyAddressList, null, targetAddress); + var decodedResponse = PNCounterAddCodec.DecodeResponse(response); + + UpdateObservedReplicaTimestamps(decodedResponse.replicaTimestamps); + + return decodedResponse.value; + } + + public void Reset() + { + _observedClock = new VectorClock(); + } + + /// + /// Adds the delta and returns the value of the counter before the update + /// if getBeforeUpdate" is true or the value after the update if it is false. + /// It will invoke client messages recursively on viable replica addresses + /// until successful or the list of viable replicas is exhausted. + /// Replicas with addresses contained in the excludedAddresses are skipped. + /// If there are no viable replicas, this method will throw the lastException if not null + /// or a NoDataMemberInClusterException if the lastException is null. + /// + /// the delta to add to the counter value, can be negative + /// true if the operation should return the counter value before the addition, + /// false if it should return the value after the addition + /// the addresses to exclude when choosing a replica address, must not be null + /// the exception thrown from the last invocation of the request on a replica, may be null + /// the target address + /// the result of the request invocation on a replica + /// if there are no replicas and the lastException is null + internal IClientMessage InvokeAddInternal(long delta, bool getBeforeUpdate, HashSet
excludedAddresses, Exception lastException, Address targetAddress) + { + if (targetAddress == null) + { + if (lastException != null) + throw lastException; + + throw new NoDataMemberInClusterException("Cannot invoke operations on a CRDT because the cluster does not contain any data members"); + } + + try + { + var request = PNCounterAddCodec.EncodeRequest(GetName(), delta, getBeforeUpdate, _observedClock.EntrySet(), targetAddress); + return InvokeOnTarget(request, targetAddress); + } + catch (Exception ex) + { + Logger.GetLogger(GetType()).Finest("Unable to provide session guarantees when sending operations to " + + targetAddress.ToString() + ", choosing different target. Cause: " + + ex.ToString()); + + // Make sure that this only affects the local variable of the method + if (excludedAddresses == _emptyAddressList) + excludedAddresses = new HashSet
(); + + // Add current/failed address to exclusion list + excludedAddresses.Add(targetAddress); + + // Look for the new target address (taking into account exclusion list) + var newTarget = GetCRDTOperationTarget(excludedAddresses); + + // Send null target address in case it's uninitialized instance + return InvokeAddInternal(delta, getBeforeUpdate, excludedAddresses, ex, newTarget); + } + } + + /// + /// Returns the current value of the counter. + /// It will invoke client messages recursively on viable replica addresses + /// until successful or the list of viable replicas is exhausted. + /// Replicas with addresses contained in the excludedAddresses are skipped. + /// If there are no viable replicas, this method will throw the lastException + /// if not null or a NoDataMemberInClusterException if the lastException is null. + /// + /// the addresses to exclude when choosing a replica address, must not be null + /// the exception thrown from the last invocation of the request on a replica, may be null + /// the target address + /// the result of the request invocation on a replica + /// if there are no replicas and the lastException is null + internal IClientMessage InvokeGetInternal(HashSet
excludedAddresses, Exception lastException, Address targetAddress) + { + if (targetAddress == null) + { + if (lastException != null) + throw lastException; + + throw new NoDataMemberInClusterException("Cannot invoke operations on a CRDT because the cluster does not contain any data members"); + } + + try + { + var request = PNCounterGetCodec.EncodeRequest(GetName(), _observedClock.EntrySet(), targetAddress); + return InvokeOnTarget(request, targetAddress); + } + catch (Exception ex) + { + Logger.GetLogger(GetType()).Finest("Unable to provide session guarantees when sending operations to " + + targetAddress.ToString() + ", choosing different target. Cause: " + + ex.ToString()); + + // Make sure that this only affects the local variable of the method + if (excludedAddresses == _emptyAddressList) + excludedAddresses = new HashSet
(); + + // Add current/failed address to exclusion list + excludedAddresses.Add(targetAddress); + + // Look for the new target address (taking into account exclusion list) + var newTarget = GetCRDTOperationTarget(excludedAddresses); + + // Send null target address in case it's uninitialized instance + return InvokeGetInternal(excludedAddresses, ex, newTarget); + } + } + + /// + /// Returns the target on which this proxy should invoke a CRDT operation. On first invocation of this method, + /// the method will choose a target address and return that address on future invocations. + /// Replicas with addresses contained in the excludedAddresses list are excluded and if the chosen replica is in this list, + /// a new replica is chosen and returned on future invocations. + /// The method may return null if there are no viable target addresses. + /// + /// the addresses to exclude when choosing a replica address, must not be null + /// a CRDT replica address or null if there are no viable addresses + private Address GetCRDTOperationTarget(HashSet
excludedAddresses) + { + // Ensure the current address is not on excluded addresses list + if (_currentTargetReplicaAddress != null && !excludedAddresses.Contains(_currentTargetReplicaAddress)) + return _currentTargetReplicaAddress; + + // If address has not been provided or is on exclusion list + lock (_targetAddressGuard) + { + if (_currentTargetReplicaAddress == null || excludedAddresses.Contains(_currentTargetReplicaAddress)) + _currentTargetReplicaAddress = ChooseTargetReplica(excludedAddresses); + } + + return _currentTargetReplicaAddress; + } + + /// + /// Chooses and returns a CRDT replica address. Replicas with addresses contained in the excludedAddresses list are excluded + /// and the method chooses randomly between the collection of viable target addresses. he method may return null if there are no viable addresses. + /// + /// the addresses to exclude when choosing a replica address, must not be null + /// a CRDT replica address or {@code null} if there are no viable addresses + private Address ChooseTargetReplica(HashSet
excludedAddresses) + { + var replicaAddresses = GetReplicaAddresses(excludedAddresses); + if (replicaAddresses.Count == 0) + return null; + + // Choose random replica + int randomReplicaIndex = new Random().Next(replicaAddresses.Count); + return replicaAddresses[randomReplicaIndex]; + } + + /// + /// Returns the addresses of the CRDT replicas from the current state of the local membership list. + /// Addresses contained in the excludedAddresses collection are excluded. + /// + /// the addresses to exclude when choosing a replica address, must not be null + /// list of possible CRDT replica addresses + private List
GetReplicaAddresses(HashSet
excludedAddresses) + { + var dataMembers = GetContext().GetClusterService().GetMemberList().Where(x => !x.IsLiteMember).ToList(); + var maxConfiguredReplicaCount = GetMaxConfiguredReplicaCount(); + int currentReplicaCount = Math.Min(maxConfiguredReplicaCount, dataMembers.Count); + var replicaAddresses = dataMembers + .Select(x => x.GetAddress()) + .Where(x => excludedAddresses.Contains(x) == false) + .Take(currentReplicaCount) + .ToList(); + + return replicaAddresses; + } + + /// + /// Returns the max configured replica count. When invoked for the first time, + /// this method will fetch the configuration from a cluster member. + /// + /// the maximum configured replica count + private int GetMaxConfiguredReplicaCount() + { + if (_maxConfiguredReplicaCount > 0) + return _maxConfiguredReplicaCount; + + var request = PNCounterGetConfiguredReplicaCountCodec.EncodeRequest(GetName()); + var response = Invoke(request); + var decodedResult = PNCounterGetConfiguredReplicaCountCodec.DecodeResponse(response); + + _maxConfiguredReplicaCount = decodedResult.response; + return _maxConfiguredReplicaCount; + } + + /// + /// Updates the locally observed CRDT vector clock atomically. This method is thread safe and can be called concurrently. + /// The method will only update the clock if the receivedLogicalTimestamps is higher than the currently observed vector clock. + /// + /// logical timestamps received from a replica state read + public void UpdateObservedReplicaTimestamps(TimeStampIList timeStamps) + { + var newVectorClock = new VectorClock(timeStamps); + + while (true) + { + // Store the original value just to avoid issue with data capture order + var originalValue = _observedClock; + + if (originalValue.IsAfter(newVectorClock)) + break; + + if (Interlocked.CompareExchange(ref _observedClock, newVectorClock, originalValue) == originalValue) + break; + } + } + } +} diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/ClientInvocationService.cs b/Hazelcast.Net/Hazelcast.Client.Spi/ClientInvocationService.cs index b783d03e2e..fd04ca3a18 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/ClientInvocationService.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/ClientInvocationService.cs @@ -159,6 +159,11 @@ private void InvokeInternal(ClientInvocation invocation, Address address = null, connection = GetConnection(address); if (connection == null) { + if (address != null && _client.GetClientClusterService().GetMember(address) == null) + { + throw new TargetNotMemberException(string.Format("Target {0} is not a member.", address)); + } + //Create an async connection and send the invocation afterward. _clientConnectionManager.GetOrConnectAsync(address).ContinueWith(t => { @@ -261,6 +266,15 @@ private bool ShouldRetryInvocation(ClientInvocation invocation, Exception except return false; } + if (invocation.Address != null && exception is TargetNotMemberException && + _client.GetClientClusterService().GetMember(invocation.Address) == null) + { + //when invocation send over address + //if exception is target not member and + //address is not available in member list , don't retry + return false; + } + //validate exception return exception is IOException || exception is SocketException diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/ClientProxy.cs b/Hazelcast.Net/Hazelcast.Client.Spi/ClientProxy.cs index 1264fa6b6b..94d8cccd15 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/ClientProxy.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/ClientProxy.cs @@ -19,6 +19,7 @@ using Hazelcast.Client.Protocol; using Hazelcast.Client.Protocol.Codec; using Hazelcast.Core; +using Hazelcast.IO; using Hazelcast.IO.Serialization; using Hazelcast.Partition.Strategy; using Hazelcast.Util; @@ -175,6 +176,19 @@ protected ISet ToDataSet(ICollection c) return valueSet; } + protected IClientMessage InvokeOnTarget(IClientMessage request, Address target) + { + try + { + var task = GetContext().GetInvocationService().InvokeOnTarget(request, target); + return ThreadUtil.GetResult(task); + } + catch (Exception e) + { + throw ExceptionUtil.Rethrow(e); + } + } + protected virtual IClientMessage Invoke(IClientMessage request, object key) { try diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/Exception.cs b/Hazelcast.Net/Hazelcast.Client.Spi/Exception.cs index 0711caf301..3c449d5de2 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/Exception.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/Exception.cs @@ -56,12 +56,19 @@ public TargetNotMemberException(string message) : base(message) [Serializable] public class TargetDisconnectedException : RetryableHazelcastException { - /// + /// + /// Constructor version with an Address instance to assign to + /// + /// is an Adress instance to assign to public TargetDisconnectedException(Address address) : base("Target[" + address + "] disconnected.") { } - /// + /// + /// Constructor version with an Address instance and message to assign to + /// + /// is an Adress instance to assign to + /// is a message to assign to public TargetDisconnectedException(Address address, string message) : base("Target[" + address + "] disconnected, " + message) { diff --git a/Hazelcast.Net/Hazelcast.Client.Spi/ProxyManager.cs b/Hazelcast.Net/Hazelcast.Client.Spi/ProxyManager.cs index f7a67debab..c87ccbd425 100644 --- a/Hazelcast.Net/Hazelcast.Client.Spi/ProxyManager.cs +++ b/Hazelcast.Net/Hazelcast.Client.Spi/ProxyManager.cs @@ -215,6 +215,8 @@ public void Init(ClientConfig config) Register(ServiceNames.Lock, (type, id) => ProxyFactory(typeof(ClientLockProxy), type, ServiceNames.Lock, id)); Register(ServiceNames.CountDownLatch, (type, id) => ProxyFactory(typeof(ClientCountDownLatchProxy), type, ServiceNames.CountDownLatch, id)); + Register(ServiceNames.PNCounter, + (type, id) => ProxyFactory(typeof(ClientPNCounterProxy), type, ServiceNames.PNCounter, id)); Register(ServiceNames.Semaphore, (type, id) => ProxyFactory(typeof(ClientSemaphoreProxy), type, ServiceNames.Semaphore, id)); Register(ServiceNames.Ringbuffer, diff --git a/Hazelcast.Net/Hazelcast.Client/HazelcastClient.cs b/Hazelcast.Net/Hazelcast.Client/HazelcastClient.cs index 0a7b923c1f..fa299f7b41 100644 --- a/Hazelcast.Net/Hazelcast.Client/HazelcastClient.cs +++ b/Hazelcast.Net/Hazelcast.Client/HazelcastClient.cs @@ -214,6 +214,12 @@ public ICountDownLatch GetCountDownLatch(string name) return GetDistributedObject(ServiceNames.CountDownLatch, name); } + /// + public IPNCounter GetPNCounter(string name) + { + return GetDistributedObject(ServiceNames.PNCounter, name); + } + /// public ISemaphore GetSemaphore(string name) { diff --git a/Hazelcast.Net/Hazelcast.Client/HazelcastClientProxy.cs b/Hazelcast.Net/Hazelcast.Client/HazelcastClientProxy.cs index c33381b179..994c8a8077 100644 --- a/Hazelcast.Net/Hazelcast.Client/HazelcastClientProxy.cs +++ b/Hazelcast.Net/Hazelcast.Client/HazelcastClientProxy.cs @@ -115,6 +115,11 @@ public ICountDownLatch GetCountDownLatch(string name) return GetClient().GetCountDownLatch(name); } + public IPNCounter GetPNCounter(string name) + { + return GetClient().GetPNCounter(name); + } + public ISemaphore GetSemaphore(string name) { return GetClient().GetSemaphore(name); diff --git a/Hazelcast.Net/Hazelcast.Core/HazelcastException.cs b/Hazelcast.Net/Hazelcast.Core/HazelcastException.cs index d58130ce04..fc5e9b5744 100644 --- a/Hazelcast.Net/Hazelcast.Core/HazelcastException.cs +++ b/Hazelcast.Net/Hazelcast.Core/HazelcastException.cs @@ -22,18 +22,34 @@ namespace Hazelcast.Core [Serializable] public class HazelcastException : SystemException { + /// + /// Default constructor + /// public HazelcastException() { } + /// + /// Constructor version with a message to assign to + /// + /// is message to assign to public HazelcastException(string message) : base(message) { } + /// + /// Constructor version with a message and Exception instance to assign to + /// + /// is a message to assign to + /// is an exception instance to assign to public HazelcastException(string message, Exception cause) : base(message, cause) { } + /// + /// Constructor version with an exception instance to assign to + /// + /// is an exception instance to assign to public HazelcastException(Exception cause) : base(cause.Message) { } @@ -102,4 +118,32 @@ public StaleSequenceException(string message) : base(message) { } } + + /// + /// An exception that is thrown when the session guarantees have been lost + /// + [Serializable] + public class ConsistencyLostException : HazelcastException + { + public ConsistencyLostException() + { + } + + public ConsistencyLostException(string message) : base(message) + { + } + } + + /// Thrown when invoke operations on a CRDT failed because the cluster does not contain any data members. + /// Thrown when invoke operations on a CRDT failed because the cluster does not contain any data members. + [Serializable] + public class NoDataMemberInClusterException : HazelcastException + { + public NoDataMemberInClusterException() + { + } + public NoDataMemberInClusterException(string message) : base(message) + { + } + } } \ No newline at end of file diff --git a/Hazelcast.Net/Hazelcast.Core/IHazelcastInstance.cs b/Hazelcast.Net/Hazelcast.Core/IHazelcastInstance.cs index 3aff298793..90c1a8c9f8 100644 --- a/Hazelcast.Net/Hazelcast.Core/IHazelcastInstance.cs +++ b/Hazelcast.Net/Hazelcast.Core/IHazelcastInstance.cs @@ -73,6 +73,13 @@ public interface IHazelcastInstance /// ICountDownLatch proxy for the given name ICountDownLatch GetCountDownLatch(string name); + /// + /// PN (Positive-Negative) CRDT counter. + /// + /// name of the IPNCounter proxy + /// IPNCounter proxy for the given name + IPNCounter GetPNCounter(string name); + /// name of the service /// name of the object /// IDistributedObject created by the service diff --git a/Hazelcast.Net/Hazelcast.Core/IPNCounter.cs b/Hazelcast.Net/Hazelcast.Core/IPNCounter.cs new file mode 100644 index 0000000000..a439884c6d --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Core/IPNCounter.cs @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Hazelcast.Client.Spi; + +namespace Hazelcast.Core +{ + /// + /// PN (Positive-Negative) CRDT counter. + /// + /// + /// The counter supports adding and subtracting values as well as retrieving the current counter value. + /// Each replica of this counter can perform operations locally without coordination with the other replicas, + /// thus increasing availability. + /// The counter guarantees that whenever two nodes have received the same set of updates, + /// possibly in a different order, their state is identical, and any conflicting updates are merged automatically. + /// If no new updates are made to the shared state, all nodes that can communicate will eventually have the same data. + /// + /// The updates to this counter are applied locally when invoked on a CRDT replica. + /// A replica can be any hazelcast instance which is not a client or a lite member. + /// The number of replicas in the cluster is determined by the PNCounterConfig configuration value. + /// + /// When invoking updates from non-replica instance, the invocation is remote. + /// This may lead to indeterminate state - the update may be applied but the response has not been received. + /// In this case, the caller will be notified with a when invoking from a client. + /// + /// The read and write methods provide monotonic read and RYW(read - your - write) guarantees. + /// These guarantees are session guarantees which means that if no replica with the previously observed state is reachable, + /// the session guarantees are lost and the method invocation will throw a . + /// This does not mean that an update is lost.All of the updates are part of some replica + /// and will be eventually reflected in the state of all other replicas. This exception just means + /// that you cannot observe your own writes because all replicas that contain your updates are currently unreachable. + /// After you have received a , you can either + /// wait for a sufficiently up - to - date replica to become reachable in which + /// case the session can be continued or you can reset the session by calling the method. + /// If you have called the method, a new session is started with the next invocation + /// to a CRDT replica. + /// NOTE: + /// The CRDT state is kept entirely on non-lite(data) members. + /// If there aren't any and the methods here are invoked on a lite member, they will fail with an + /// + public interface IPNCounter : IDistributedObject + { + /// + /// Returns the current value of the counter. + /// + /// The current value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long Get(); + + /// + /// Adds the given value to the current value. + /// + /// the value to add + /// The previous value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long GetAndAdd(long delta); + + /// + /// Adds the given value to the current value + /// + /// the value to add + /// the updated value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long AddAndGet(long delta); + + /// + /// Subtracts the given value from the current value. + /// + /// the value to add + /// the previous value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long GetAndSubtract(long delta); + + /// + /// Subtracts the given value from the current value. + /// + /// the value to subtract + /// the updated value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long SubtractAndGet(long delta); + + /// + /// Decrements by one the current value. + /// + /// the updated value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long DecrementAndGet(); + + /// + /// Increments by one the current value. + /// + /// the updated value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long IncrementAndGet(); + + /// + /// Decrements by one the current value. + /// + /// the previous value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long GetAndDecrement(); + + /// + /// Increments by one the current value. + /// + /// the previous value + /// if the cluster does not contain any data members + /// if the cluster version is less than 3.10 + /// if the session guarantees have been lost + long GetAndIncrement(); + + /// + /// Resets the observed state by this PN counter. + /// This method may be used after a method invocation has thrown a + /// to reset the proxy and to be able to start a new session. + /// + void Reset(); + } +} + diff --git a/Hazelcast.Net/Hazelcast.Core/ServiceNames.cs b/Hazelcast.Net/Hazelcast.Core/ServiceNames.cs index d9fb88b2ee..c39ad73c68 100644 --- a/Hazelcast.Net/Hazelcast.Core/ServiceNames.cs +++ b/Hazelcast.Net/Hazelcast.Core/ServiceNames.cs @@ -34,6 +34,8 @@ internal class ServiceNames public const string CountDownLatch = "hz:impl:countDownLatchService"; + public const string PNCounter = "hz:impl:PNCounterService"; + public const string Semaphore = "hz:impl:semaphoreService"; public const string Cluster = "hz:impl:clusterService"; diff --git a/Hazelcast.Net/Hazelcast.Core/VectorClock.cs b/Hazelcast.Net/Hazelcast.Core/VectorClock.cs new file mode 100644 index 0000000000..df03ceb239 --- /dev/null +++ b/Hazelcast.Net/Hazelcast.Core/VectorClock.cs @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Generic; +using TimeStampIList = System.Collections.Generic.IList>; + +namespace Hazelcast.Core +{ + internal class VectorClock + { + private readonly Dictionary _timeStampDictionary = new Dictionary(); + private readonly TimeStampIList _timeStampList = new List>(); + + public VectorClock() + { + } + + public VectorClock(TimeStampIList timeStampList) + { + _timeStampList = timeStampList; + foreach (var pair in timeStampList) + { + _timeStampDictionary.Add(pair.Key, pair.Value); + } + } + + internal bool IsAfter(VectorClock other) + { + var anyTimestampGreater = false; + foreach (var otherEntry in other._timeStampDictionary) + { + var replicaId = otherEntry.Key; + var otherReplicaTimestamp = otherEntry.Value; + long localReplicaTimestamp; + if (!_timeStampDictionary.TryGetValue(replicaId, out localReplicaTimestamp) || localReplicaTimestamp < otherReplicaTimestamp) + { + return false; + } + if (localReplicaTimestamp > otherReplicaTimestamp) + { + anyTimestampGreater = true; + } + } + // there is at least one local timestamp greater or local vector clock has additional timestamps + return anyTimestampGreater || other._timeStampDictionary.Count < _timeStampDictionary.Count; + } + + // Returns a set of replica logical timestamps for this vector clock. + public IList> EntrySet() + { + return _timeStampList; + } + } +} diff --git a/Hazelcast.Net/Hazelcast.Util/ClientExceptionFactory.cs b/Hazelcast.Net/Hazelcast.Util/ClientExceptionFactory.cs index 2e160e6a49..5b25a05645 100644 --- a/Hazelcast.Net/Hazelcast.Util/ClientExceptionFactory.cs +++ b/Hazelcast.Net/Hazelcast.Util/ClientExceptionFactory.cs @@ -79,7 +79,8 @@ internal class ClientExceptionFactory {ClientProtocolErrorCodes.TransactionTimedOut, (m, c) => new TransactionTimedOutException(m)}, {ClientProtocolErrorCodes.UriSyntax, (m, c) => new UriFormatException(m)}, {ClientProtocolErrorCodes.UtfDataFormat, (m, c) => new InvalidDataException(m)}, - {ClientProtocolErrorCodes.UnsupportedOperation, (m, c) => new NotSupportedException(m)} + {ClientProtocolErrorCodes.UnsupportedOperation, (m, c) => new NotSupportedException(m)}, + {ClientProtocolErrorCodes.ConsistencyLostException, (m, c) => new ConsistencyLostException(m)} }; public Exception CreateException(Error error) diff --git a/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterBasicIntegrationTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterBasicIntegrationTest.cs new file mode 100644 index 0000000000..bda3d7712f --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterBasicIntegrationTest.cs @@ -0,0 +1,42 @@ +// Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Hazelcast.Core; +using NUnit.Framework; + +namespace Hazelcast.Client.Test +{ + [TestFixture] + [Category("3.10")] + public class ClientPNCounterBasicIntegrationTest : SingleMemberBaseTest + { + [Test] + public void SimpleReplicationTest() + { + var counterName = "counter"; + var counter1 = Client.GetPNCounter(counterName); + var counter2 = Client.GetPNCounter(counterName); + + Assert.AreEqual(5L, counter1.AddAndGet(5L)); + + AssertCounterValueEventually(5L, counter1); + AssertCounterValueEventually(5L, counter2); + } + + private void AssertCounterValueEventually(long expectedValue, IPNCounter counter) + { + TestSupport.AssertTrueEventually(() => { Assert.AreEqual(expectedValue, counter.Get()); }); + } + } +} diff --git a/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterConsistencyLossTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterConsistencyLossTest.cs new file mode 100644 index 0000000000..e4dda6430e --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterConsistencyLossTest.cs @@ -0,0 +1,124 @@ +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Linq; +using System.Threading; +using Hazelcast.Client.Proxy; +using Hazelcast.Config; +using Hazelcast.Core; +using Hazelcast.Test; +using NUnit.Framework; + +namespace Hazelcast.Client.Test +{ + [TestFixture("AddAndGet")] + [TestFixture("Get")] + [Category("3.10")] + public class ClientPNCounterConsistencyLossTest : MultiMemberBaseNoSetupTest + { + private readonly string _type; + private ClientPNCounterProxy _pnCounter; + + public ClientPNCounterConsistencyLossTest(string type) + { + _type = type; + } + + [OneTimeTearDown] + public void Cleanup() + { + StopRemoteController(RemoteController); + } + + [SetUp] + public void Setup() + { + SetupCluster(); + _pnCounter = Client.GetPNCounter(TestSupport.RandomString()) as ClientPNCounterProxy; + } + + [TearDown] + public void TearDown() + { + RemoteController.shutdownCluster(HzCluster.Id); + } + + protected override void InitMembers() + { + //Init 2 members + MemberList.Add(RemoteController.startMember(HzCluster.Id)); + MemberList.Add(RemoteController.startMember(HzCluster.Id)); + } + + protected override void ConfigureClient(ClientConfig config) + { + config.GetNetworkConfig().SetConnectionAttemptLimit(1); + config.GetNetworkConfig().SetConnectionAttemptPeriod(2000); + } + + protected override string GetServerConfig() + { + return Resources.hazelcast_crdt_replication; + } + + [Test] + public void DriverCanContinueSessionByCallingReset() + { + _pnCounter.AddAndGet(5); + Assert.AreEqual(5, _pnCounter.Get()); + + TerminateTargetReplicaMember(); + Thread.Sleep(1000); + + _pnCounter.Reset(); + Mutation(); + } + + [Test] + public void ConsistencyLostExceptionIsThrownWhenTargetReplicaDisappears() + { + _pnCounter.AddAndGet(5); + Assert.AreEqual(5, _pnCounter.Get()); + + TerminateTargetReplicaMember(); + Thread.Sleep(1000); + + Assert.Throws(Mutation); + } + + private void Mutation() + { + switch (_type) + { + case "AddAndGet": + _pnCounter.AddAndGet(5); + break; + + case "Get": + _pnCounter.Get(); + break; + } + } + + private void TerminateTargetReplicaMember() + { + // Shutdown "primary" member + var allMembers = Client.GetCluster().GetMembers(); + var currentTarget = _pnCounter._currentTargetReplicaAddress; + var primaryMember = allMembers.First(x => x.GetAddress().Equals(currentTarget)); + + RemoteController.terminateMember(HzCluster.Id, primaryMember.GetUuid()); + } + } +} \ No newline at end of file diff --git a/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterNoDataMemberTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterNoDataMemberTest.cs new file mode 100644 index 0000000000..1833cc4910 --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterNoDataMemberTest.cs @@ -0,0 +1,62 @@ +// Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Hazelcast.Client.Proxy; +using Hazelcast.Config; +using Hazelcast.Core; +using Hazelcast.Test; +using NUnit.Framework; + +namespace Hazelcast.Client.Test +{ + [TestFixture] + [Category("3.10")] + public class ClientPNCounterNoDataMemberTest : MultiMemberBaseTest + { + private ClientPNCounterProxy _pnCounter; + + [SetUp] + public void Setup() + { + _pnCounter = Client.GetPNCounter(TestSupport.RandomString()) as ClientPNCounterProxy; + } + + protected override void ConfigureGroup(ClientConfig config) + { + config.GetGroupConfig().SetName(HzCluster.Id).SetPassword(HzCluster.Id); + } + + protected override void ConfigureClient(ClientConfig config) + { + config.GetNetworkConfig().SetConnectionAttemptLimit(1); + config.GetNetworkConfig().SetConnectionAttemptPeriod(2000); + } + + protected override string GetServerConfig() + { + return Resources.hazelcast_lite_member; + } + + [Test] + public void NoDataMemberExceptionIsThrown() + { + Assert.Throws(Mutate); + } + + private void Mutate() + { + _pnCounter.AddAndGet(5); + } + } +} diff --git a/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterTest.cs new file mode 100644 index 0000000000..cb1731c69e --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/ClientPNCounterTest.cs @@ -0,0 +1,225 @@ +// Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Hazelcast.Client.Proxy; +using Hazelcast.Core; +using Hazelcast.IO; +using NUnit.Framework; + +namespace Hazelcast.Client.Test +{ + [TestFixture] + [Category("3.10")] + public class ClientPNCounterTest : SingleMemberBaseTest + { + private ClientPNCounterProxy _pnCounter; + + [SetUp] + public void Setup() + { + _pnCounter = Client.GetPNCounter(TestSupport.RandomString()) as ClientPNCounterProxy; + } + + [Test] + public void AddAndGet_Succeeded() + { + var result = _pnCounter.AddAndGet(10); + Assert.AreEqual(10, result); + } + + [Test] + public void DecrementAndGet_Succeeded() + { + _pnCounter.AddAndGet(10); + var result = _pnCounter.DecrementAndGet(); + + Assert.AreEqual(9, result); + } + + [Test] + public void Get_Succeeded() + { + _pnCounter.AddAndGet(10); + var result = _pnCounter.Get(); + + Assert.AreEqual(10, result); + } + + [Test] + public void GetAndAdd_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.GetAndAdd(10); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1+10, result2); + } + + [Test] + public void GetAndDecrement_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.GetAndDecrement(); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1, result2 + 1); + } + + [Test] + public void GetAndIncrement_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.GetAndIncrement(); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1 + 1, result2); + } + + [Test] + public void GetAndSubtract_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.GetAndSubtract(5); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1, result2 + 5); + } + + [Test] + public void IncrementAndGet_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.IncrementAndGet(); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1, result2); + Assert.AreEqual(11, result2); + } + + [Test] + public void SubtractAndGet_Succeeded() + { + _pnCounter.AddAndGet(10); + + var result1 = _pnCounter.SubtractAndGet(5); + var result2 = _pnCounter.Get(); + + Assert.AreEqual(result1, result2); + Assert.AreEqual(5, result2); + } + + [Test] + public void UpdateObservedReplicaTimestamps_Later_Succeeded() + { + var initList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 20), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + _pnCounter.UpdateObservedReplicaTimestamps(initList); + + var testList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 50), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + _pnCounter.UpdateObservedReplicaTimestamps(testList); + + Assert.AreEqual(testList, _pnCounter._observedClock.EntrySet()); + } + + [Test] + public void UpdateObservedReplicaTimestamps_Earlier_Succeeded() + { + var initList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 20), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + _pnCounter.UpdateObservedReplicaTimestamps(initList); + + var testList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 10), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + _pnCounter.UpdateObservedReplicaTimestamps(testList); + + Assert.AreEqual(initList, _pnCounter._observedClock.EntrySet()); + } + + [Test] + public void InvokeAdd_NoAddressNoLastException_ThrowsDefaultException() + { + var excludedAddresses=new HashSet
(); + Exception lastException = null; + Address targetAddress = null; + + Assert.Throws(() => _pnCounter.InvokeAddInternal(10, true, excludedAddresses, lastException, targetAddress)); + } + + [Test] + public void InvokeAdd_NoAddressHasLastException_ThrowsLastException() + { + var excludedAddresses = new HashSet
(); + Exception lastException = new OutOfMemoryException(); + Address targetAddress = null; + + Assert.Throws(() => _pnCounter.InvokeAddInternal(10, true, excludedAddresses, lastException, targetAddress)); + } + + [Test] + public void InvokeGet_NoAddressNoLastException_ThrowsDefaultException() + { + var excludedAddresses = new HashSet
(); + Exception lastException = null; + Address targetAddress = null; + + Assert.Throws(() => _pnCounter.InvokeGetInternal(excludedAddresses, lastException, targetAddress)); + } + + [Test] + public void InvokeGet_NoAddressHasLastException_ThrowsLastException() + { + var excludedAddresses = new HashSet
(); + Exception lastException = new OutOfMemoryException(); + Address targetAddress = null; + + Assert.Throws(() => _pnCounter.InvokeGetInternal(excludedAddresses, lastException, targetAddress)); + } + } +} diff --git a/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseNoSetupTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseNoSetupTest.cs new file mode 100644 index 0000000000..4062900884 --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseNoSetupTest.cs @@ -0,0 +1,60 @@ +// Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Hazelcast.Core; +using Hazelcast.Remote; +using Hazelcast.Config; +using System.Collections.Generic; + +namespace Hazelcast.Client.Test +{ + public class MultiMemberBaseNoSetupTest : HazelcastTestSupport + { + protected IHazelcastInstance Client { get; private set; } + protected HazelcastClient ClientInternal { get; private set; } + protected ThreadSafeRemoteController RemoteController { get; private set; } + protected Cluster HzCluster { get; private set; } + protected readonly List MemberList = new List(); + + public virtual void SetupCluster() + { + RemoteController = (ThreadSafeRemoteController)CreateRemoteController(); + HzCluster = CreateCluster(RemoteController, GetServerConfig()); + InitMembers(); + Client = CreateClient(); + ClientInternal = ((HazelcastClientProxy)Client).GetClient(); + } + + public virtual void ShutdownRemoteController() + { + HazelcastClient.ShutdownAll(); + StopRemoteController(RemoteController); + } + + protected virtual void InitMembers() + { + MemberList.Add(RemoteController.startMember(HzCluster.Id)); + } + + protected virtual string GetServerConfig() + { + return Hazelcast.Test.Resources.hazelcast; + } + + protected override void ConfigureGroup(ClientConfig config) + { + config.GetGroupConfig().SetName(HzCluster.Id).SetPassword(HzCluster.Id); + } + } +} diff --git a/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseTest.cs index f9e9bf14d2..17d0647763 100644 --- a/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseTest.cs +++ b/Hazelcast.Test/Hazelcast.Client.Test/MultiMemberBaseTest.cs @@ -12,54 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System.Collections.Generic; -using Hazelcast.Config; -using Hazelcast.Core; -using Hazelcast.Remote; -using Hazelcast.Test; using NUnit.Framework; -using Member = Hazelcast.Remote.Member; namespace Hazelcast.Client.Test { - public class MultiMemberBaseTest : HazelcastTestSupport + public class MultiMemberBaseTest : MultiMemberBaseNoSetupTest { - protected IHazelcastInstance Client { get; private set; } - protected HazelcastClient ClientInternal { get; private set; } - protected ThreadSafeRemoteController RemoteController { get; private set; } - protected Cluster HzCluster { get; private set; } - protected readonly List MemberList = new List(); - [OneTimeSetUp] - public virtual void SetupCluster() + public void OneTimeSetUp() { - RemoteController = (ThreadSafeRemoteController) CreateRemoteController(); - HzCluster = CreateCluster(RemoteController, GetServerConfig()); - InitMembers(); - Client = CreateClient(); - ClientInternal = ((HazelcastClientProxy) Client).GetClient(); + SetupCluster(); } [OneTimeTearDown] - public virtual void ShutdownRemoteController() - { - HazelcastClient.ShutdownAll(); - StopRemoteController(RemoteController); - } - - protected virtual void InitMembers() - { - MemberList.Add(RemoteController.startMember(HzCluster.Id)); - } - - protected virtual string GetServerConfig() - { - return Resources.hazelcast; - } - - protected override void ConfigureGroup(ClientConfig config) + public void OneTimeTearDown() { - config.GetGroupConfig().SetName(HzCluster.Id).SetPassword(HzCluster.Id); + ShutdownRemoteController(); } } } \ No newline at end of file diff --git a/Hazelcast.Test/Hazelcast.Client.Test/VectorClockTest.cs b/Hazelcast.Test/Hazelcast.Client.Test/VectorClockTest.cs new file mode 100644 index 0000000000..033cf31183 --- /dev/null +++ b/Hazelcast.Test/Hazelcast.Client.Test/VectorClockTest.cs @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Hazelcast.Core; +using NUnit.Framework; +using System.Collections.Generic; + +namespace Hazelcast.Client.Test +{ + [TestFixture] + [Category("3.10")] + public class VectorClockTest + { + private VectorClock _inst; + + [SetUp] + public void Init() + { + var initList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 20), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + _inst = new VectorClock(initList); + } + + [TearDown] + public void Destroy() + { + _inst = null; + } + + [Test] + public void NewerTSDetectedOnNewSet() + { + // Arrange + var newList = new List>() + { + new KeyValuePair("node-1", 100), + new KeyValuePair("node-2", 20), + new KeyValuePair("node-3", 30), + new KeyValuePair("node-4", 40), + new KeyValuePair("node-5", 50) + }; + + var newVector = new VectorClock(newList); + + // Act + var result = _inst.IsAfter(newVector); + + // Assert + Assert.IsFalse(result); + } + + [Test] + public void SmallerListOnNewSet() + { + // Arrange + var newList = new List>() + { + new KeyValuePair("node-1", 10), + new KeyValuePair("node-2", 20) + }; + + var newVector = new VectorClock(newList); + + // Act + var result = _inst.IsAfter(newVector); + + // Assert + Assert.IsTrue(result); + } + + [Test] + public void SmallerListWithNewerItemOnNewSet() + { + // Arrange + var newList = new List>() + { + new KeyValuePair("node-1", 100), + new KeyValuePair("node-2", 20) + }; + + var newVector = new VectorClock(newList); + + // Act + var result = _inst.IsAfter(newVector); + + // Assert + Assert.IsFalse(result); + } + } +} diff --git a/Hazelcast.Test/Hazelcast.Remote/ThreadSafeRemoteController.cs b/Hazelcast.Test/Hazelcast.Remote/ThreadSafeRemoteController.cs index 2822f036e4..31183136c3 100644 --- a/Hazelcast.Test/Hazelcast.Remote/ThreadSafeRemoteController.cs +++ b/Hazelcast.Test/Hazelcast.Remote/ThreadSafeRemoteController.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. +// Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ public override bool shutdownMember(string clusterId, string memberId) public override bool terminateMember(string clusterId, string memberId) { - return ThreadSafeCall(() => base.shutdownMember(clusterId, memberId)); + return ThreadSafeCall(() => base.terminateMember(clusterId, memberId)); } public override bool suspendMember(string clusterId, string memberId) @@ -85,7 +85,7 @@ public override bool shutdownCluster(string clusterId) public override bool terminateCluster(string clusterId) { - return ThreadSafeCall(() => base.shutdownCluster(clusterId)); + return ThreadSafeCall(() => base.terminateCluster(clusterId)); } public override Cluster splitMemberFromCluster(string memberId) diff --git a/Hazelcast.Test/Properties/Resources.cs b/Hazelcast.Test/Properties/Resources.cs index 104b742b2b..6dc17ff3d6 100644 --- a/Hazelcast.Test/Properties/Resources.cs +++ b/Hazelcast.Test/Properties/Resources.cs @@ -22,6 +22,8 @@ internal static class Resources public static string hazelcast_ssl_signed {get { return GetXmlResourceContent("hazelcast-ssl-signed"); }} public static string hazelcast_ssl {get { return GetXmlResourceContent("hazelcast-ssl"); }} public static string hazelcast_stat {get { return GetXmlResourceContent("hazelcast-stat"); }} + public static string hazelcast_crdt_replication { get { return GetXmlResourceContent("hazelcast-crdt-replication"); } } + public static string hazelcast_lite_member { get { return GetXmlResourceContent("hazelcast-lite-member"); } } private static byte[] GetBytes(string name) { diff --git a/Hazelcast.Test/Resources/hazelcast-crdt-replication.xml b/Hazelcast.Test/Resources/hazelcast-crdt-replication.xml new file mode 100644 index 0000000000..a7a5998792 --- /dev/null +++ b/Hazelcast.Test/Resources/hazelcast-crdt-replication.xml @@ -0,0 +1,55 @@ + + + + + false + 10 + 5 + + + + dev + dev-pass + + + http://localhost:8080/mancenter + + + 5701 + + + 0 + + + + 224.7.7.7 + 54327 + + + 127.0.0.1 + + + 127.0.0.1 + + + + + + + + com.hazelcast.client.test.IdentifiedFactory + + + + + + 2147483647 + 2147483647 + + + \ No newline at end of file diff --git a/Hazelcast.Test/Resources/hazelcast-lite-member.xml b/Hazelcast.Test/Resources/hazelcast-lite-member.xml new file mode 100644 index 0000000000..a5137b47e4 --- /dev/null +++ b/Hazelcast.Test/Resources/hazelcast-lite-member.xml @@ -0,0 +1,56 @@ + + + + + false + 10 + 5 + + + + dev + dev-pass + + + + http://localhost:8080/mancenter + + + 5701 + + + 0 + + + + 224.7.7.7 + 54327 + + + 127.0.0.1 + + + 127.0.0.1 + + + + + + + + com.hazelcast.client.test.IdentifiedFactory + + + + + + 2147483647 + 2147483647 + + + \ No newline at end of file