diff --git a/doc/dev/doc/cpsubsystem.md b/doc/dev/doc/cpsubsystem.md index 8dadf6a67..6e545a90a 100644 --- a/doc/dev/doc/cpsubsystem.md +++ b/doc/dev/doc/cpsubsystem.md @@ -12,4 +12,5 @@ Currently, the C# client CP SubSystem implements the following services: * [AtomicLong](distributed-objects/atomiclong.md) * [AtomicRef](distributed-objects/atomicref.md) * [FencedLock](distributed-objects/fencedlock.md) -* [CPMap](distributed-objects/cpmap.md) \ No newline at end of file +* [CPMap](distributed-objects/cpmap.md) +* [CountDownLatch](distributed-objects/countdownlatch.md) \ No newline at end of file diff --git a/doc/dev/doc/distributed-objects/countdownlatch.md b/doc/dev/doc/distributed-objects/countdownlatch.md new file mode 100644 index 000000000..86bc74e7a --- /dev/null +++ b/doc/dev/doc/distributed-objects/countdownlatch.md @@ -0,0 +1,29 @@ +# AtomicLong + +> [!NOTE] +> ICountDownLatch is a member of CP Subsystem API. For detailed information, see the [CP SubSystem documentation](../cpsubsystem.md). + +Hazelcast @Hazelcast.CP.ICountDownLatch is the distributed implementation of `java.util.concurrent.CountDownLatch`. It is a +cluster-wide synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. + +The following example code creates a latch, and waits on it: + +```csharp +await using var client = await HazelcastClientFactory.StartNewClientAsync(); +await using var latch = await client.CPSubSystem.GetCountDownLatchAsync("latch-unique-name"); + +await latch.TrySetCountAsync(4); + +var waiting = latch.AwaitAsync(TimeSpan.FromSeconds(30)); +// waiting is NOT completed +latch.CountDownAsync(); +// latch.GetCountAsync() would be 3 +latch.CountDownAsync(); +// latch.GetCountAsync() would be 2 +latch.CountDownAsync(); +// latch.GetCountAsync() would be 1 +latch.CountDownAsync(); +// latch.GetCountAsync() is now zero + +await waiting; // waiting is completed +``` diff --git a/doc/dev/doc/toc.md b/doc/dev/doc/toc.md index 8fad7762e..d1aa71bc6 100644 --- a/doc/dev/doc/toc.md +++ b/doc/dev/doc/toc.md @@ -30,6 +30,7 @@ ## [FlakeIdGenerator](distributed-objects/flakeidgenerator.md) ## [FencedLock](distributed-objects/fencedlock.md) ## [CPMap](distributed-objects/cpmap.md) +## [CountDownLatch](distributed-objects/countdownlatch.md) # [Distributed Computing](distributedComputing.md) # [Distributed Query](distributedQuery.md) # [CP SubSystem](cpsubsystem.md) diff --git a/src/Hazelcast.Net.Tests/CP/CountDownLatchTests.cs b/src/Hazelcast.Net.Tests/CP/CountDownLatchTests.cs new file mode 100644 index 000000000..d4c3fec86 --- /dev/null +++ b/src/Hazelcast.Net.Tests/CP/CountDownLatchTests.cs @@ -0,0 +1,68 @@ +// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading.Tasks; +using Hazelcast.Testing; +using NUnit.Framework; + +namespace Hazelcast.Tests.CP; + +[TestFixture] +public class CountDownLatchTests : SingleMemberClientRemoteTestBase +{ + [Test] + public async Task NormalTest() + { + var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName()); + Assert.That(await cdl.TrySetCountAsync(4)); + Assert.That(await cdl.TrySetCountAsync(4), Is.False); + var waiting = cdl.AwaitAsync(TimeSpan.FromSeconds(30)); + for (var i = 4; i > 0; i--) + { + Assert.That(waiting.IsCompleted, Is.False); + Assert.That(await cdl.GetCountAsync(), Is.EqualTo(i)); + await cdl.CountDownAsync(); + } + + await AssertEx.SucceedsEventually(() => + { + Assert.That(waiting.IsCompleted); + }, 4_000, 500); + + Assert.That(await cdl.GetCountAsync(), Is.EqualTo(0)); + await cdl.CountDownAsync(); + Assert.That(await cdl.GetCountAsync(), Is.EqualTo(0)); + } + + [Test] + public async Task TimeoutTest() + { + var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName()); + Assert.That(await cdl.TrySetCountAsync(4)); + var waiting = cdl.AwaitAsync(TimeSpan.FromSeconds(1)); + + await AssertEx.SucceedsEventually(() => + { + Assert.That(waiting.IsCompleted && waiting.Result == false); + }, 4_000, 500); + } + + [Test] + public async Task DestroyTest() + { + var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName()); + await cdl.DestroyAsync(); + } +} \ No newline at end of file diff --git a/src/Hazelcast.Net/CP/CPSubsystem.cs b/src/Hazelcast.Net/CP/CPSubsystem.cs index 32213a5bd..906562d04 100644 --- a/src/Hazelcast.Net/CP/CPSubsystem.cs +++ b/src/Hazelcast.Net/CP/CPSubsystem.cs @@ -142,6 +142,14 @@ public async Task> GetMapAsync([NotNull] stri _serializationService, groupId); } + public async Task GetCountDownLatchAsync(string name) + { + var (groupName, objectName, _) = ParseName(name); + var groupId = await GetGroupIdAsync(groupName).CfAwait(); + + return new CountDownLatch(objectName, groupId, _cluster, _serializationService); + } + // see: ClientRaftProxyFactory.java private async Task GetGroupIdAsync(string proxyName) diff --git a/src/Hazelcast.Net/CP/CountDownLatch.cs b/src/Hazelcast.Net/CP/CountDownLatch.cs new file mode 100644 index 000000000..76277b1a4 --- /dev/null +++ b/src/Hazelcast.Net/CP/CountDownLatch.cs @@ -0,0 +1,89 @@ +// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading.Tasks; +using Hazelcast.Clustering; +using Hazelcast.Core; +using Hazelcast.DistributedObjects; +using Hazelcast.Protocol; +using Hazelcast.Protocol.Codecs; +using Hazelcast.Protocol.Models; +using Hazelcast.Serialization; + +namespace Hazelcast.CP; + +internal class CountDownLatch : CPDistributedObjectBase, ICountDownLatch +{ + public CountDownLatch(string name, CPGroupId groupId, Cluster cluster, SerializationService serializationService) + : base(ServiceNames.CountDownLatch, name, groupId, cluster, serializationService) + { } + + public async Task AwaitAsync(TimeSpan timeout) + { + var timeoutMillis = (long) timeout.TotalMilliseconds; + if (timeoutMillis < 0) timeoutMillis = 0; + var requestMessage = CountDownLatchAwaitCodec.EncodeRequest(CPGroupId, Name, Guid.NewGuid(), timeoutMillis); + var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait(); + return CountDownLatchAwaitCodec.DecodeResponse(responseMessage).Response; + } + + public async Task CountDownAsync() + { + var round = await GetRoundAsync().CfAwait(); + var uuid = Guid.NewGuid(); + for (;;) + { + try + { + await CountDownAsync(round, uuid).CfAwait(); + return; + } + catch (RemoteException e) when (e.Error == RemoteError.OperationTimeout) + { + // ignore and retry + } + } + } + + private async Task GetRoundAsync() + { + var requestMessage = CountDownLatchGetRoundCodec.EncodeRequest(CPGroupId, Name); + var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait(); + return CountDownLatchGetRoundCodec.DecodeResponse(responseMessage).Response; + } + + private async Task CountDownAsync(int round, Guid uuid) + { + var requestMessage = CountDownLatchCountDownCodec.EncodeRequest(CPGroupId, Name, uuid, round); + var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait(); + CountDownLatchCountDownCodec.DecodeResponse(responseMessage); + } + + public async Task GetCountAsync() + { + var requestMessage = CountDownLatchGetCountCodec.EncodeRequest(CPGroupId, Name); + var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait(); + return CountDownLatchGetCountCodec.DecodeResponse(responseMessage).Response; + } + + public async Task TrySetCountAsync(int count) + { + if (count <= 0) throw new ArgumentException("Value must be greater than zero.", nameof(count)); + + var requestMessage = CountDownLatchTrySetCountCodec.EncodeRequest(CPGroupId, Name, count); + var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait(); + return CountDownLatchTrySetCountCodec.DecodeResponse(responseMessage).Response; + } +} diff --git a/src/Hazelcast.Net/CP/ICPSubsystem.cs b/src/Hazelcast.Net/CP/ICPSubsystem.cs index b5230d39e..e798a7408 100644 --- a/src/Hazelcast.Net/CP/ICPSubsystem.cs +++ b/src/Hazelcast.Net/CP/ICPSubsystem.cs @@ -13,7 +13,6 @@ // limitations under the License. using System.Threading.Tasks; -using Hazelcast.Protocol.Codecs; namespace Hazelcast.CP { @@ -66,5 +65,14 @@ public interface ICPSubsystem /// Type of the key. /// Type of the value. Task> GetMapAsync(string name); + + /// + /// Gets an distributed object. + /// + /// The unique name of the countdown latch. + /// If an object with the specified does not + /// exist already in the cluster, a new object is created. + /// The countdown latch. + Task GetCountDownLatchAsync(string name); } } diff --git a/src/Hazelcast.Net/CP/ICPSubsystem.cs.orig b/src/Hazelcast.Net/CP/ICPSubsystem.cs.orig new file mode 100644 index 000000000..faded4911 --- /dev/null +++ b/src/Hazelcast.Net/CP/ICPSubsystem.cs.orig @@ -0,0 +1,81 @@ +// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading.Tasks; +using Hazelcast.Protocol.Codecs; + +namespace Hazelcast.CP +{ + /// + /// Defines the CP subsystem. + /// + public interface ICPSubsystem + { + /// + /// Gets an distributed object. + /// + /// The unique name of the atomic long. + /// The atomic long that was retrieved or created. + /// + /// If an object with the specified does not + /// exist already in the cluster, a new object is created. + /// + Task GetAtomicLongAsync(string name); + + /// + /// Gets an distributed object. + /// + /// The unique name of the atomic reference. + /// The atomic reference that was retrieved or created. + /// + /// If an object with the specified does not + /// exist already in the cluster, a new object is created. + /// + Task> GetAtomicReferenceAsync(string name); + + /// + /// Gets an distributed object. + /// + /// The unique name of the fenced lock. + /// If an object with the specified does not + /// exist already in the cluster, a new object is created. + /// + Task GetLockAsync(string name); + + /// +<<<<<<< HEAD + /// Gets an distributed object. + /// + /// The unique name of the countdown latch. + /// If an object with the specified does not + /// exist already in the cluster, a new object is created. + /// The countdown latch. + Task GetCountDownLatchAsync(string name); + } +======= + /// Gets an distributed object. + /// CPMap is only available in enterprise cluster. + /// The map will be created in DEFAULT CP group if no group name provided within . + /// If a group name provided, first, the group will be initialized, + /// if does not exist. Then, instance will be created on this group. + /// + /// + /// + /// The unique name of the map. It can contain the group name like "myMap@group1" + /// Type of the key. + /// Type of the value. + Task> GetMapAsync(string name); + } +>>>>>>> master +} diff --git a/src/Hazelcast.Net/CP/ICountDownLatch.cs b/src/Hazelcast.Net/CP/ICountDownLatch.cs new file mode 100644 index 000000000..922af7c7f --- /dev/null +++ b/src/Hazelcast.Net/CP/ICountDownLatch.cs @@ -0,0 +1,58 @@ +// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading.Tasks; + +namespace Hazelcast.CP; + +/// +/// Represents a countdown latch which is a backed-up distributed alternative to the +/// java.util.concurrent.CountDownLatch. It is a cluster-wide synchronization aid +/// that allows one or more threads to wait until a set of operations being +/// performed in other threads completes. +/// It works on top of the Raft consensus algorithm. It offers linearizability +/// during crash failures and network partitions. It is CP with respect to the CAP +/// principle. If a network partition occurs, it remains available on at most one +/// side of the partition. +/// +public interface ICountDownLatch : ICPDistributedObject +{ + /// + /// Waits until the latch has counted down to zero, or the specified timeout + /// waiting time has expired. + /// + /// The wait timeout. + /// Whether the count reached zero within the specified timeout + /// waiting time. + Task AwaitAsync(TimeSpan timeout); + + /// + /// Decrements the count of the latch. + /// + Task CountDownAsync(); + + /// + /// Gets the current count of the latch. + /// + /// The current count of the latch. + Task GetCountAsync(); + + /// + /// Sets the count to the specified value if it is zero. + /// + /// The new count. + /// Whether the count was set. + Task TrySetCountAsync(int count); +} \ No newline at end of file diff --git a/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs b/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs index 82f4b1633..d803b894f 100644 --- a/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs +++ b/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs @@ -93,5 +93,10 @@ internal static class ServiceNames /// The name of the CPMap service. /// public const string CPMap = "hz:raft:mapService"; + + /// + /// The name of the CountDown Latch service. + /// + public const string CountDownLatch = "hz:raft:countDownLatchService"; } } diff --git a/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs.orig b/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs.orig new file mode 100644 index 000000000..57c9d5556 --- /dev/null +++ b/src/Hazelcast.Net/DistributedObjects/ServiceNames.cs.orig @@ -0,0 +1,103 @@ +// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Hazelcast.DistributedObjects +{ + /// + /// Define the service names. + /// + internal static class ServiceNames + { + /// + /// The name of the map service. + /// + public const string Map = "hz:impl:mapService"; + + /// + /// The name of the topic service. + /// + public const string Topic = "hz:impl:topicService"; + + /// + /// The name of the reliable topic service. + /// + public const string ReliableTopic = "hz:impl:reliableTopicService"; + + /// + /// The name of the list service. + /// + public const string List = "hz:impl:listService"; + + /// + /// The name of the multi map service. + /// + public const string MultiMap = "hz:impl:multiMapService"; + + /// + /// The name of the queue service. + /// + public const string Queue = "hz:impl:queueService"; + + /// + /// The name of the replicated map service. + /// + public const string ReplicatedMap = "hz:impl:replicatedMapService"; + + /// + /// The name off the ring buffer service. + /// + public const string RingBuffer = "hz:impl:ringbufferService"; + + /// + /// The name of the set service. + /// + public const string Set = "hz:impl:setService"; + + /// + /// The name of the raft atomic long service. + /// + public const string AtomicLong = "hz:raft:atomicLongService"; + + /// + /// The name of the raft atomic ref service. + /// + public const string AtomicRef = "hz:raft:atomicRefService"; + + /// + /// The name of the Flake ID Generator service. + /// + public const string FlakeIdGenerator = "hz:impl:flakeIdGeneratorService"; + + /// + /// The name of the CP Session Manager service. + /// + public const string CPSession = "hz:core:raftSession"; + + /// + /// The name of the Fenced Lock service. + /// + public const string FencedLock = "hz:raft:lockService"; + + /// +<<<<<<< HEAD + /// The name of the CountDown Latch service. + /// + public const string CountDownLatch = "hz:raft:countDownLatchService"; +======= + /// The name of the CPMap service. + /// + public const string CPMap = "hz:raft:mapService"; +>>>>>>> master + } +} diff --git a/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchAwaitCodec.cs b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchAwaitCodec.cs new file mode 100644 index 000000000..3ae3e7a7b --- /dev/null +++ b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchAwaitCodec.cs @@ -0,0 +1,163 @@ +// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// This code was generated by a tool. +// Hazelcast Client Protocol Code Generator @5655fe9be +// https://github.com/hazelcast/hazelcast-client-protocol +// Change to this file will be lost if the code is regenerated. +// + +#pragma warning disable IDE0051 // Remove unused private members +// ReSharper disable UnusedMember.Local +// ReSharper disable RedundantUsingDirective +// ReSharper disable CheckNamespace + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using Hazelcast.Protocol.BuiltInCodecs; +using Hazelcast.Protocol.CustomCodecs; +using Hazelcast.Core; +using Hazelcast.Messaging; +using Hazelcast.Clustering; +using Hazelcast.Serialization; +using Microsoft.Extensions.Logging; + +namespace Hazelcast.Protocol.Codecs +{ + /// + /// Causes the current thread to wait until the latch has counted down + /// to zero, or an exception is thrown, or the specified waiting time + /// elapses. If the current count is zero then this method returns + /// immediately with the value true. If the current count is greater than + /// zero, then the current thread becomes disabled for thread scheduling + /// purposes and lies dormant until one of five things happen: the count + /// reaches zero due to invocations of the {@code countDown} method, this + /// ICountDownLatch instance is destroyed, the countdown owner becomes + /// disconnected, some other thread Thread#interrupt interrupts the current + /// thread, or the specified waiting time elapses. If the count reaches zero + /// then the method returns with the value true. If the current thread has + /// its interrupted status set on entry to this method, or is interrupted + /// while waiting, then {@code InterruptedException} is thrown + /// and the current thread's interrupted status is cleared. If the specified + /// waiting time elapses then the value false is returned. If the time is + /// less than or equal to zero, the method will not wait at all. + /// +#if SERVER_CODEC + internal static class CountDownLatchAwaitServerCodec +#else + internal static class CountDownLatchAwaitCodec +#endif + { + public const int RequestMessageType = 721408; // 0x0B0200 + public const int ResponseMessageType = 721409; // 0x0B0201 + private const int RequestInvocationUidFieldOffset = Messaging.FrameFields.Offset.PartitionId + BytesExtensions.SizeOfInt; + private const int RequestTimeoutMsFieldOffset = RequestInvocationUidFieldOffset + BytesExtensions.SizeOfCodecGuid; + private const int RequestInitialFrameSize = RequestTimeoutMsFieldOffset + BytesExtensions.SizeOfLong; + private const int ResponseResponseFieldOffset = Messaging.FrameFields.Offset.ResponseBackupAcks + BytesExtensions.SizeOfByte; + private const int ResponseInitialFrameSize = ResponseResponseFieldOffset + BytesExtensions.SizeOfBool; + +#if SERVER_CODEC + public sealed class RequestParameters + { + + /// + /// CP group id of this CountDownLatch instance + /// + public Hazelcast.CP.CPGroupId GroupId { get; set; } + + /// + /// Name of this CountDownLatch instance + /// + public string Name { get; set; } + + /// + /// UID of this invocation + /// + public Guid InvocationUid { get; set; } + + /// + /// The maximum time in milliseconds to wait + /// + public long TimeoutMs { get; set; } + } +#endif + + public static ClientMessage EncodeRequest(Hazelcast.CP.CPGroupId groupId, string name, Guid invocationUid, long timeoutMs) + { + var clientMessage = new ClientMessage + { + IsRetryable = true, + OperationName = "CountDownLatch.Await" + }; + var initialFrame = new Frame(new byte[RequestInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, RequestMessageType); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.PartitionId, -1); + initialFrame.Bytes.WriteGuidL(RequestInvocationUidFieldOffset, invocationUid); + initialFrame.Bytes.WriteLongL(RequestTimeoutMsFieldOffset, timeoutMs); + clientMessage.Append(initialFrame); + RaftGroupIdCodec.Encode(clientMessage, groupId); + StringCodec.Encode(clientMessage, name); + return clientMessage; + } + +#if SERVER_CODEC + public static RequestParameters DecodeRequest(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var request = new RequestParameters(); + var initialFrame = iterator.Take(); + request.InvocationUid = initialFrame.Bytes.ReadGuidL(RequestInvocationUidFieldOffset); + request.TimeoutMs = initialFrame.Bytes.ReadLongL(RequestTimeoutMsFieldOffset); + request.GroupId = RaftGroupIdCodec.Decode(iterator); + request.Name = StringCodec.Decode(iterator); + return request; + } +#endif + + public sealed class ResponseParameters + { + + /// + /// true if the count reached zero, false if + /// the waiting time elapsed before the count reached 0 + /// + public bool Response { get; set; } + } + +#if SERVER_CODEC + public static ClientMessage EncodeResponse(bool response) + { + var clientMessage = new ClientMessage(); + var initialFrame = new Frame(new byte[ResponseInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, ResponseMessageType); + initialFrame.Bytes.WriteBoolL(ResponseResponseFieldOffset, response); + clientMessage.Append(initialFrame); + return clientMessage; + } +#endif + + public static ResponseParameters DecodeResponse(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var response = new ResponseParameters(); + var initialFrame = iterator.Take(); + response.Response = initialFrame.Bytes.ReadBoolL(ResponseResponseFieldOffset); + return response; + } + + } +} diff --git a/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchCountDownCodec.cs b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchCountDownCodec.cs new file mode 100644 index 000000000..c1b4fb9c1 --- /dev/null +++ b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchCountDownCodec.cs @@ -0,0 +1,143 @@ +// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// This code was generated by a tool. +// Hazelcast Client Protocol Code Generator @5655fe9be +// https://github.com/hazelcast/hazelcast-client-protocol +// Change to this file will be lost if the code is regenerated. +// + +#pragma warning disable IDE0051 // Remove unused private members +// ReSharper disable UnusedMember.Local +// ReSharper disable RedundantUsingDirective +// ReSharper disable CheckNamespace + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using Hazelcast.Protocol.BuiltInCodecs; +using Hazelcast.Protocol.CustomCodecs; +using Hazelcast.Core; +using Hazelcast.Messaging; +using Hazelcast.Clustering; +using Hazelcast.Serialization; +using Microsoft.Extensions.Logging; + +namespace Hazelcast.Protocol.Codecs +{ + /// + /// Decrements the count of the latch, releasing all waiting threads if + /// the count reaches zero. If the current count is greater than zero, then + /// it is decremented. If the new count is zero: All waiting threads are + /// re-enabled for thread scheduling purposes, and Countdown owner is set to + /// null. If the current count equals zero, then nothing happens. + /// +#if SERVER_CODEC + internal static class CountDownLatchCountDownServerCodec +#else + internal static class CountDownLatchCountDownCodec +#endif + { + public const int RequestMessageType = 721664; // 0x0B0300 + public const int ResponseMessageType = 721665; // 0x0B0301 + private const int RequestInvocationUidFieldOffset = Messaging.FrameFields.Offset.PartitionId + BytesExtensions.SizeOfInt; + private const int RequestExpectedRoundFieldOffset = RequestInvocationUidFieldOffset + BytesExtensions.SizeOfCodecGuid; + private const int RequestInitialFrameSize = RequestExpectedRoundFieldOffset + BytesExtensions.SizeOfInt; + private const int ResponseInitialFrameSize = Messaging.FrameFields.Offset.ResponseBackupAcks + BytesExtensions.SizeOfByte; + +#if SERVER_CODEC + public sealed class RequestParameters + { + + /// + /// CP group id of this CountDownLatch instance + /// + public Hazelcast.CP.CPGroupId GroupId { get; set; } + + /// + /// Name of the CountDownLatch instance + /// + public string Name { get; set; } + + /// + /// UID of this invocation + /// + public Guid InvocationUid { get; set; } + + /// + /// The round this invocation will be performed on + /// + public int ExpectedRound { get; set; } + } +#endif + + public static ClientMessage EncodeRequest(Hazelcast.CP.CPGroupId groupId, string name, Guid invocationUid, int expectedRound) + { + var clientMessage = new ClientMessage + { + IsRetryable = true, + OperationName = "CountDownLatch.CountDown" + }; + var initialFrame = new Frame(new byte[RequestInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, RequestMessageType); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.PartitionId, -1); + initialFrame.Bytes.WriteGuidL(RequestInvocationUidFieldOffset, invocationUid); + initialFrame.Bytes.WriteIntL(RequestExpectedRoundFieldOffset, expectedRound); + clientMessage.Append(initialFrame); + RaftGroupIdCodec.Encode(clientMessage, groupId); + StringCodec.Encode(clientMessage, name); + return clientMessage; + } + +#if SERVER_CODEC + public static RequestParameters DecodeRequest(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var request = new RequestParameters(); + var initialFrame = iterator.Take(); + request.InvocationUid = initialFrame.Bytes.ReadGuidL(RequestInvocationUidFieldOffset); + request.ExpectedRound = initialFrame.Bytes.ReadIntL(RequestExpectedRoundFieldOffset); + request.GroupId = RaftGroupIdCodec.Decode(iterator); + request.Name = StringCodec.Decode(iterator); + return request; + } +#endif + + public sealed class ResponseParameters + { + } + +#if SERVER_CODEC + public static ClientMessage EncodeResponse() + { + var clientMessage = new ClientMessage(); + var initialFrame = new Frame(new byte[ResponseInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, ResponseMessageType); + clientMessage.Append(initialFrame); + return clientMessage; + } +#endif + + public static ResponseParameters DecodeResponse(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var response = new ResponseParameters(); + iterator.Take(); // empty initial frame + return response; + } + + } +} diff --git a/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetCountCodec.cs b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetCountCodec.cs new file mode 100644 index 000000000..c2de002ab --- /dev/null +++ b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetCountCodec.cs @@ -0,0 +1,131 @@ +// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// This code was generated by a tool. +// Hazelcast Client Protocol Code Generator @5655fe9be +// https://github.com/hazelcast/hazelcast-client-protocol +// Change to this file will be lost if the code is regenerated. +// + +#pragma warning disable IDE0051 // Remove unused private members +// ReSharper disable UnusedMember.Local +// ReSharper disable RedundantUsingDirective +// ReSharper disable CheckNamespace + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using Hazelcast.Protocol.BuiltInCodecs; +using Hazelcast.Protocol.CustomCodecs; +using Hazelcast.Core; +using Hazelcast.Messaging; +using Hazelcast.Clustering; +using Hazelcast.Serialization; +using Microsoft.Extensions.Logging; + +namespace Hazelcast.Protocol.Codecs +{ + /// + /// Returns the current count. + /// +#if SERVER_CODEC + internal static class CountDownLatchGetCountServerCodec +#else + internal static class CountDownLatchGetCountCodec +#endif + { + public const int RequestMessageType = 721920; // 0x0B0400 + public const int ResponseMessageType = 721921; // 0x0B0401 + private const int RequestInitialFrameSize = Messaging.FrameFields.Offset.PartitionId + BytesExtensions.SizeOfInt; + private const int ResponseResponseFieldOffset = Messaging.FrameFields.Offset.ResponseBackupAcks + BytesExtensions.SizeOfByte; + private const int ResponseInitialFrameSize = ResponseResponseFieldOffset + BytesExtensions.SizeOfInt; + +#if SERVER_CODEC + public sealed class RequestParameters + { + + /// + /// CP group id of this CountDownLatch instance + /// + public Hazelcast.CP.CPGroupId GroupId { get; set; } + + /// + /// Name of the CountDownLatch instance + /// + public string Name { get; set; } + } +#endif + + public static ClientMessage EncodeRequest(Hazelcast.CP.CPGroupId groupId, string name) + { + var clientMessage = new ClientMessage + { + IsRetryable = true, + OperationName = "CountDownLatch.GetCount" + }; + var initialFrame = new Frame(new byte[RequestInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, RequestMessageType); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.PartitionId, -1); + clientMessage.Append(initialFrame); + RaftGroupIdCodec.Encode(clientMessage, groupId); + StringCodec.Encode(clientMessage, name); + return clientMessage; + } + +#if SERVER_CODEC + public static RequestParameters DecodeRequest(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var request = new RequestParameters(); + iterator.Take(); // empty initial frame + request.GroupId = RaftGroupIdCodec.Decode(iterator); + request.Name = StringCodec.Decode(iterator); + return request; + } +#endif + + public sealed class ResponseParameters + { + + /// + /// The current count of this CountDownLatch instance + /// + public int Response { get; set; } + } + +#if SERVER_CODEC + public static ClientMessage EncodeResponse(int response) + { + var clientMessage = new ClientMessage(); + var initialFrame = new Frame(new byte[ResponseInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, ResponseMessageType); + initialFrame.Bytes.WriteIntL(ResponseResponseFieldOffset, response); + clientMessage.Append(initialFrame); + return clientMessage; + } +#endif + + public static ResponseParameters DecodeResponse(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var response = new ResponseParameters(); + var initialFrame = iterator.Take(); + response.Response = initialFrame.Bytes.ReadIntL(ResponseResponseFieldOffset); + return response; + } + + } +} diff --git a/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetRoundCodec.cs b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetRoundCodec.cs new file mode 100644 index 000000000..07e9bdfad --- /dev/null +++ b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchGetRoundCodec.cs @@ -0,0 +1,132 @@ +// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// This code was generated by a tool. +// Hazelcast Client Protocol Code Generator @5655fe9be +// https://github.com/hazelcast/hazelcast-client-protocol +// Change to this file will be lost if the code is regenerated. +// + +#pragma warning disable IDE0051 // Remove unused private members +// ReSharper disable UnusedMember.Local +// ReSharper disable RedundantUsingDirective +// ReSharper disable CheckNamespace + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using Hazelcast.Protocol.BuiltInCodecs; +using Hazelcast.Protocol.CustomCodecs; +using Hazelcast.Core; +using Hazelcast.Messaging; +using Hazelcast.Clustering; +using Hazelcast.Serialization; +using Microsoft.Extensions.Logging; + +namespace Hazelcast.Protocol.Codecs +{ + /// + /// Returns the current round. A round completes when the count value + /// reaches to 0 and a new round starts afterwards. + /// +#if SERVER_CODEC + internal static class CountDownLatchGetRoundServerCodec +#else + internal static class CountDownLatchGetRoundCodec +#endif + { + public const int RequestMessageType = 722176; // 0x0B0500 + public const int ResponseMessageType = 722177; // 0x0B0501 + private const int RequestInitialFrameSize = Messaging.FrameFields.Offset.PartitionId + BytesExtensions.SizeOfInt; + private const int ResponseResponseFieldOffset = Messaging.FrameFields.Offset.ResponseBackupAcks + BytesExtensions.SizeOfByte; + private const int ResponseInitialFrameSize = ResponseResponseFieldOffset + BytesExtensions.SizeOfInt; + +#if SERVER_CODEC + public sealed class RequestParameters + { + + /// + /// CP group id of this CountDownLatch instance + /// + public Hazelcast.CP.CPGroupId GroupId { get; set; } + + /// + /// Name of the CountDownLatch instance + /// + public string Name { get; set; } + } +#endif + + public static ClientMessage EncodeRequest(Hazelcast.CP.CPGroupId groupId, string name) + { + var clientMessage = new ClientMessage + { + IsRetryable = true, + OperationName = "CountDownLatch.GetRound" + }; + var initialFrame = new Frame(new byte[RequestInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, RequestMessageType); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.PartitionId, -1); + clientMessage.Append(initialFrame); + RaftGroupIdCodec.Encode(clientMessage, groupId); + StringCodec.Encode(clientMessage, name); + return clientMessage; + } + +#if SERVER_CODEC + public static RequestParameters DecodeRequest(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var request = new RequestParameters(); + iterator.Take(); // empty initial frame + request.GroupId = RaftGroupIdCodec.Decode(iterator); + request.Name = StringCodec.Decode(iterator); + return request; + } +#endif + + public sealed class ResponseParameters + { + + /// + /// The current round of this CountDownLatch instance + /// + public int Response { get; set; } + } + +#if SERVER_CODEC + public static ClientMessage EncodeResponse(int response) + { + var clientMessage = new ClientMessage(); + var initialFrame = new Frame(new byte[ResponseInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, ResponseMessageType); + initialFrame.Bytes.WriteIntL(ResponseResponseFieldOffset, response); + clientMessage.Append(initialFrame); + return clientMessage; + } +#endif + + public static ResponseParameters DecodeResponse(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var response = new ResponseParameters(); + var initialFrame = iterator.Take(); + response.Response = initialFrame.Bytes.ReadIntL(ResponseResponseFieldOffset); + return response; + } + + } +} diff --git a/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchTrySetCountCodec.cs b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchTrySetCountCodec.cs new file mode 100644 index 000000000..178d95872 --- /dev/null +++ b/src/Hazelcast.Net/Protocol/Codecs/CountDownLatchTrySetCountCodec.cs @@ -0,0 +1,143 @@ +// Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// This code was generated by a tool. +// Hazelcast Client Protocol Code Generator @5655fe9be +// https://github.com/hazelcast/hazelcast-client-protocol +// Change to this file will be lost if the code is regenerated. +// + +#pragma warning disable IDE0051 // Remove unused private members +// ReSharper disable UnusedMember.Local +// ReSharper disable RedundantUsingDirective +// ReSharper disable CheckNamespace + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using Hazelcast.Protocol.BuiltInCodecs; +using Hazelcast.Protocol.CustomCodecs; +using Hazelcast.Core; +using Hazelcast.Messaging; +using Hazelcast.Clustering; +using Hazelcast.Serialization; +using Microsoft.Extensions.Logging; + +namespace Hazelcast.Protocol.Codecs +{ + /// + /// Sets the count to the given value if the current count is zero. + /// If the count is not zero, then this method does nothing + /// and returns false + /// +#if SERVER_CODEC + internal static class CountDownLatchTrySetCountServerCodec +#else + internal static class CountDownLatchTrySetCountCodec +#endif + { + public const int RequestMessageType = 721152; // 0x0B0100 + public const int ResponseMessageType = 721153; // 0x0B0101 + private const int RequestCountFieldOffset = Messaging.FrameFields.Offset.PartitionId + BytesExtensions.SizeOfInt; + private const int RequestInitialFrameSize = RequestCountFieldOffset + BytesExtensions.SizeOfInt; + private const int ResponseResponseFieldOffset = Messaging.FrameFields.Offset.ResponseBackupAcks + BytesExtensions.SizeOfByte; + private const int ResponseInitialFrameSize = ResponseResponseFieldOffset + BytesExtensions.SizeOfBool; + +#if SERVER_CODEC + public sealed class RequestParameters + { + + /// + /// CP group id of this CountDownLatch instance + /// + public Hazelcast.CP.CPGroupId GroupId { get; set; } + + /// + /// Name of the CountDownLatch instance + /// + public string Name { get; set; } + + /// + /// The number of times countDown must be invoked before + /// threads can pass through await + /// + public int Count { get; set; } + } +#endif + + public static ClientMessage EncodeRequest(Hazelcast.CP.CPGroupId groupId, string name, int count) + { + var clientMessage = new ClientMessage + { + IsRetryable = true, + OperationName = "CountDownLatch.TrySetCount" + }; + var initialFrame = new Frame(new byte[RequestInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, RequestMessageType); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.PartitionId, -1); + initialFrame.Bytes.WriteIntL(RequestCountFieldOffset, count); + clientMessage.Append(initialFrame); + RaftGroupIdCodec.Encode(clientMessage, groupId); + StringCodec.Encode(clientMessage, name); + return clientMessage; + } + +#if SERVER_CODEC + public static RequestParameters DecodeRequest(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var request = new RequestParameters(); + var initialFrame = iterator.Take(); + request.Count = initialFrame.Bytes.ReadIntL(RequestCountFieldOffset); + request.GroupId = RaftGroupIdCodec.Decode(iterator); + request.Name = StringCodec.Decode(iterator); + return request; + } +#endif + + public sealed class ResponseParameters + { + + /// + /// true if the new count was set, + /// false if the current count is not zero. + /// + public bool Response { get; set; } + } + +#if SERVER_CODEC + public static ClientMessage EncodeResponse(bool response) + { + var clientMessage = new ClientMessage(); + var initialFrame = new Frame(new byte[ResponseInitialFrameSize], (FrameFlags) ClientMessageFlags.Unfragmented); + initialFrame.Bytes.WriteIntL(Messaging.FrameFields.Offset.MessageType, ResponseMessageType); + initialFrame.Bytes.WriteBoolL(ResponseResponseFieldOffset, response); + clientMessage.Append(initialFrame); + return clientMessage; + } +#endif + + public static ResponseParameters DecodeResponse(ClientMessage clientMessage) + { + using var iterator = clientMessage.GetEnumerator(); + var response = new ResponseParameters(); + var initialFrame = iterator.Take(); + response.Response = initialFrame.Bytes.ReadBoolL(ResponseResponseFieldOffset); + return response; + } + + } +}