Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/dev/doc/cpsubsystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ Currently, the C# client CP SubSystem implements the following services:
* [AtomicLong](distributed-objects/atomiclong.md)
* [AtomicRef](distributed-objects/atomicref.md)
* [FencedLock](distributed-objects/fencedlock.md)
* [CPMap](distributed-objects/cpmap.md)
* [CPMap](distributed-objects/cpmap.md)
* [CountDownLatch](distributed-objects/countdownlatch.md)
29 changes: 29 additions & 0 deletions doc/dev/doc/distributed-objects/countdownlatch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# AtomicLong

> [!NOTE]
> ICountDownLatch is a member of CP Subsystem API. For detailed information, see the [CP SubSystem documentation](../cpsubsystem.md).

Hazelcast @Hazelcast.CP.ICountDownLatch is the distributed implementation of `java.util.concurrent.CountDownLatch`. It is a
cluster-wide synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

The following example code creates a latch, and waits on it:

```csharp
await using var client = await HazelcastClientFactory.StartNewClientAsync();
await using var latch = await client.CPSubSystem.GetCountDownLatchAsync("latch-unique-name");

await latch.TrySetCountAsync(4);

var waiting = latch.AwaitAsync(TimeSpan.FromSeconds(30));
// waiting is NOT completed
latch.CountDownAsync();
// latch.GetCountAsync() would be 3
latch.CountDownAsync();
// latch.GetCountAsync() would be 2
latch.CountDownAsync();
// latch.GetCountAsync() would be 1
latch.CountDownAsync();
// latch.GetCountAsync() is now zero

await waiting; // waiting is completed
```
1 change: 1 addition & 0 deletions doc/dev/doc/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
## [FlakeIdGenerator](distributed-objects/flakeidgenerator.md)
## [FencedLock](distributed-objects/fencedlock.md)
## [CPMap](distributed-objects/cpmap.md)
## [CountDownLatch](distributed-objects/countdownlatch.md)
# [Distributed Computing](distributedComputing.md)
# [Distributed Query](distributedQuery.md)
# [CP SubSystem](cpsubsystem.md)
Expand Down
68 changes: 68 additions & 0 deletions src/Hazelcast.Net.Tests/CP/CountDownLatchTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading.Tasks;
using Hazelcast.Testing;
using NUnit.Framework;

namespace Hazelcast.Tests.CP;

[TestFixture]
public class CountDownLatchTests : SingleMemberClientRemoteTestBase
{
[Test]
public async Task NormalTest()
{
var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName());
Assert.That(await cdl.TrySetCountAsync(4));
Assert.That(await cdl.TrySetCountAsync(4), Is.False);
var waiting = cdl.AwaitAsync(TimeSpan.FromSeconds(30));
for (var i = 4; i > 0; i--)
{
Assert.That(waiting.IsCompleted, Is.False);
Assert.That(await cdl.GetCountAsync(), Is.EqualTo(i));
await cdl.CountDownAsync();
}

await AssertEx.SucceedsEventually(() =>
{
Assert.That(waiting.IsCompleted);
}, 4_000, 500);

Assert.That(await cdl.GetCountAsync(), Is.EqualTo(0));
await cdl.CountDownAsync();
Assert.That(await cdl.GetCountAsync(), Is.EqualTo(0));
}

[Test]
public async Task TimeoutTest()
{
var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName());
Assert.That(await cdl.TrySetCountAsync(4));
var waiting = cdl.AwaitAsync(TimeSpan.FromSeconds(1));

await AssertEx.SucceedsEventually(() =>
{
Assert.That(waiting.IsCompleted && waiting.Result == false);
}, 4_000, 500);
}

[Test]
public async Task DestroyTest()
{
var cdl = await Client.CPSubsystem.GetCountDownLatchAsync(CreateUniqueName());
await cdl.DestroyAsync();
}
}
8 changes: 8 additions & 0 deletions src/Hazelcast.Net/CP/CPSubsystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ public async Task<ICPMap<TKey, TValue>> GetMapAsync<TKey, TValue>([NotNull] stri
_serializationService, groupId);
}

public async Task<ICountDownLatch> GetCountDownLatchAsync(string name)
{
var (groupName, objectName, _) = ParseName(name);
var groupId = await GetGroupIdAsync(groupName).CfAwait();

return new CountDownLatch(objectName, groupId, _cluster, _serializationService);
}

// see: ClientRaftProxyFactory.java

private async Task<CPGroupId> GetGroupIdAsync(string proxyName)
Expand Down
89 changes: 89 additions & 0 deletions src/Hazelcast.Net/CP/CountDownLatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading.Tasks;
using Hazelcast.Clustering;
using Hazelcast.Core;
using Hazelcast.DistributedObjects;
using Hazelcast.Protocol;
using Hazelcast.Protocol.Codecs;
using Hazelcast.Protocol.Models;
using Hazelcast.Serialization;

namespace Hazelcast.CP;

internal class CountDownLatch : CPDistributedObjectBase, ICountDownLatch
{
public CountDownLatch(string name, CPGroupId groupId, Cluster cluster, SerializationService serializationService)
: base(ServiceNames.CountDownLatch, name, groupId, cluster, serializationService)
{ }

public async Task<bool> AwaitAsync(TimeSpan timeout)
{
var timeoutMillis = (long) timeout.TotalMilliseconds;
if (timeoutMillis < 0) timeoutMillis = 0;
var requestMessage = CountDownLatchAwaitCodec.EncodeRequest(CPGroupId, Name, Guid.NewGuid(), timeoutMillis);
var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait();
return CountDownLatchAwaitCodec.DecodeResponse(responseMessage).Response;
}

public async Task CountDownAsync()
{
var round = await GetRoundAsync().CfAwait();
var uuid = Guid.NewGuid();
for (;;)
{
try
{
await CountDownAsync(round, uuid).CfAwait();
return;
}
catch (RemoteException e) when (e.Error == RemoteError.OperationTimeout)
{
// ignore and retry
}
}
}

private async Task<int> GetRoundAsync()
{
var requestMessage = CountDownLatchGetRoundCodec.EncodeRequest(CPGroupId, Name);
var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait();
return CountDownLatchGetRoundCodec.DecodeResponse(responseMessage).Response;
}

private async Task CountDownAsync(int round, Guid uuid)
{
var requestMessage = CountDownLatchCountDownCodec.EncodeRequest(CPGroupId, Name, uuid, round);
var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait();
CountDownLatchCountDownCodec.DecodeResponse(responseMessage);
}

public async Task<int> GetCountAsync()
{
var requestMessage = CountDownLatchGetCountCodec.EncodeRequest(CPGroupId, Name);
var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait();
return CountDownLatchGetCountCodec.DecodeResponse(responseMessage).Response;
}

public async Task<bool> TrySetCountAsync(int count)
{
if (count <= 0) throw new ArgumentException("Value must be greater than zero.", nameof(count));

var requestMessage = CountDownLatchTrySetCountCodec.EncodeRequest(CPGroupId, Name, count);
var responseMessage = await Cluster.Messaging.SendAsync(requestMessage).CfAwait();
return CountDownLatchTrySetCountCodec.DecodeResponse(responseMessage).Response;
}
}
10 changes: 9 additions & 1 deletion src/Hazelcast.Net/CP/ICPSubsystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

using System.Threading.Tasks;
using Hazelcast.Protocol.Codecs;

namespace Hazelcast.CP
{
Expand Down Expand Up @@ -66,5 +65,14 @@ public interface ICPSubsystem
/// <typeparam name="TKey">Type of the key.</typeparam>
/// <typeparam name="TValue">Type of the value.</typeparam>
Task<ICPMap<TKey, TValue>> GetMapAsync<TKey, TValue>(string name);

/// <summary>
/// Gets an <see cref="ICountDownLatch"/> distributed object.
/// </summary>
/// <param name="name">The unique name of the countdown latch.</param>
/// <para>If an object with the specified <paramref name="name"/> does not
/// exist already in the cluster, a new object is created.</para>
/// <returns>The countdown latch.</returns>
Task<ICountDownLatch> GetCountDownLatchAsync(string name);
}
}
81 changes: 81 additions & 0 deletions src/Hazelcast.Net/CP/ICPSubsystem.cs.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading.Tasks;
using Hazelcast.Protocol.Codecs;

namespace Hazelcast.CP
{
/// <summary>
/// Defines the CP subsystem.
/// </summary>
public interface ICPSubsystem
{
/// <summary>
/// Gets an <see cref="IAtomicLong"/> distributed object.
/// </summary>
/// <param name="name">The unique name of the atomic long.</param>
/// <returns>The atomic long that was retrieved or created.</returns>
/// <remarks>
/// <para>If an object with the specified <paramref name="name"/> does not
/// exist already in the cluster, a new object is created.</para>
/// </remarks>
Task<IAtomicLong> GetAtomicLongAsync(string name);

/// <summary>
/// Gets an <see cref="IAtomicReference{T}"/> distributed object.
/// </summary>
/// <param name="name">The unique name of the atomic reference.</param>
/// <returns>The atomic reference that was retrieved or created.</returns>
/// <remarks>
/// <para>If an object with the specified <paramref name="name"/> does not
/// exist already in the cluster, a new object is created.</para>
/// </remarks>
Task<IAtomicReference<T>> GetAtomicReferenceAsync<T>(string name);

/// <summary>
/// Gets an <see cref="IFencedLock"/> distributed object.
/// </summary>
/// <param name="name">The unique name of the fenced lock.</param>
/// <para>If an object with the specified <paramref name="name"/> does not
/// exist already in the cluster, a new object is created.</para>
/// <returns></returns>
Task<IFencedLock> GetLockAsync(string name);

/// <summary>
<<<<<<< HEAD
/// Gets an <see cref="ICountDownLatch"/> distributed object.
/// </summary>
/// <param name="name">The unique name of the countdown latch.</param>
/// <para>If an object with the specified <paramref name="name"/> does not
/// exist already in the cluster, a new object is created.</para>
/// <returns>The countdown latch.</returns>
Task<ICountDownLatch> GetCountDownLatchAsync(string name);
}
=======
/// Gets an <see cref="ICPMap{TKey,TValue}"/> distributed object.
/// <remarks><para>CPMap is only available in <b>enterprise</b> cluster.</para>
/// <para>The map will be created in <b>DEFAULT</b> CP group if no group name provided within <paramref name="name"/>.
/// If a group name provided, first, the group will be initialized,
/// if does not exist. Then, <see cref="ICPMap{TKey,TValue}"/> instance will be created on this group.
/// </para>
/// </remarks>
/// </summary>
/// <param name="name">The unique name of the map. It can contain the group name like <code>"myMap@group1"</code></param>
/// <typeparam name="TKey">Type of the key.</typeparam>
/// <typeparam name="TValue">Type of the value.</typeparam>
Task<ICPMap<TKey, TValue>> GetMapAsync<TKey, TValue>(string name);
}
>>>>>>> master
}
58 changes: 58 additions & 0 deletions src/Hazelcast.Net/CP/ICountDownLatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading.Tasks;

namespace Hazelcast.CP;

/// <summary>
/// Represents a countdown latch which is a backed-up distributed alternative to the
/// java.util.concurrent.CountDownLatch. It is a cluster-wide synchronization aid
/// that allows one or more threads to wait until a set of operations being
/// performed in other threads completes.
/// It works on top of the Raft consensus algorithm. It offers linearizability
/// during crash failures and network partitions. It is CP with respect to the CAP
/// principle. If a network partition occurs, it remains available on at most one
/// side of the partition.
/// </summary>
public interface ICountDownLatch : ICPDistributedObject
{
/// <summary>
/// Waits until the latch has counted down to zero, or the specified timeout
/// waiting time has expired.
/// </summary>
/// <param name="timeout">The wait timeout.</param>
/// <returns>Whether the count reached zero within the specified timeout
/// waiting time.</returns>
Task<bool> AwaitAsync(TimeSpan timeout);

/// <summary>
/// Decrements the count of the latch.
/// </summary>
Task CountDownAsync();

/// <summary>
/// Gets the current count of the latch.
/// </summary>
/// <returns>The current count of the latch.</returns>
Task<int> GetCountAsync();

/// <summary>
/// Sets the count to the specified value if it is zero.
/// </summary>
/// <param name="count">The new count.</param>
/// <returns>Whether the count was set.</returns>
Task<bool> TrySetCountAsync(int count);
}
5 changes: 5 additions & 0 deletions src/Hazelcast.Net/DistributedObjects/ServiceNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,10 @@ internal static class ServiceNames
/// The name of the CPMap service.
/// </summary>
public const string CPMap = "hz:raft:mapService";

/// <summary>
/// The name of the CountDown Latch service.
/// </summary>
public const string CountDownLatch = "hz:raft:countDownLatchService";
}
}
Loading