Skip to content
  •  
  •  
  •  
3 changes: 2 additions & 1 deletion doc/dev/doc/cpsubsystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ The CP SubSystem is a component of a Hazelcast cluster that builds a strongly co
Currently, the C# client CP SubSystem implements the following services:
* [AtomicLong](distributed-objects/atomiclong.md)
* [AtomicRef](distributed-objects/atomicref.md)
* [FencedLock](distributed-objects/fencedlock.md)
* [FencedLock](distributed-objects/fencedlock.md)
* [CPMap](distributed-objects/cpmap.md)
52 changes: 52 additions & 0 deletions doc/dev/doc/distributed-objects/cpmap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# CPMap

> [!NOTE]
> CPMap is a member of CP Subsystem API. For detailed information, see the [CP SubSystem documentation](../cpsubsystem.md).
> [!WARNING]
> It is only available in Hazelcast Enterprise.

Hazelcast CPMap is a strongly consistent mapping structure under CP Subsystem. It offers linearizability, and supports atomic `CompareAndSetAsync`, `DeleteAsync`, `GetAsync`, `PutAsync`, `RemoveAsync` and `SetAsync` operations. Hazelcast .Net Client also supports these atomic operations, for implementation details, please, see [CPMap documentation](https://docs.hazelcast.com/hazelcast/latest/data-structures/cpmap).

In order to use `CPMap`, CP Subsystem must be enabled on the server side. Moreover, if no Raft group specified while creating the structure, `CPMap` will be created under `default` Raft group.

## Example

The following simple example creates a `CPMap` under `myGroup` Raft group. If the group has not been initialized yet,
first; the raft group will be created. Then, the structure will be initialized.

```csharp

// create an Hazelcast client and connect to a enterprise server running on localhost
// note that that server should be properly configured for CP with at least 3 members
await using var client = await HazelcastClientFactory.StartNewClientAsync(options);

// Get a CPMap under "myGroup" Raft group.
var map = await client.CPSubsystem.GetMapAsync<int, string>("myMap@myGroup");

var (key, val) = (1, "my-value");

// Set a value
// Note: Set does not return back the old value that is associated with the key. If you require the previous value,
// consider using PutAsync.
await map.SetAsync(key, val);

// Get value that is map to the key.
// If key does not exist, the return value will be null. However, we know that the key-value pair exists, and
// ignore the possible null warning.
var currentVal = await map.GetAsync(key)!;

Console.WriteLine($"Key: {key}, Expected Value: {val}, Actual Value:{currentVal}");

// Let's change the value of the key by using CompareAndSetAsync
// The expected value will be compared to current value which is associated to given key.
// If they are equal, new value will be set.
var newValue = "my-new-value";
var isSet = await map.CompareAndSetAsync(key, currentVal, newValue);

Console.WriteLine($"Key: {key}, Expected Value: {currentVal}, New Value:{newValue}, Is set successfully done:{isSet}");


// Assume that we do not need the map anymore. So, it is better to destroy the map on the cluster and release the resources.
// Note that Hazelcast does NOT do garbage collection on CPMap.
await map.DestroyAsync();
```
1 change: 1 addition & 0 deletions doc/dev/doc/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
## [AtomicRef](distributed-objects/atomicref.md)
## [FlakeIdGenerator](distributed-objects/flakeidgenerator.md)
## [FencedLock](distributed-objects/fencedlock.md)
## [CPMap](distributed-objects/cpmap.md)
# [Distributed Computing](distributedComputing.md)
# [Distributed Query](distributedQuery.md)
# [CP SubSystem](cpsubsystem.md)
Expand Down
63 changes: 63 additions & 0 deletions src/Hazelcast.Net.Examples/CP/CPMapExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading.Tasks;

namespace Hazelcast.Examples.CP;

public class CPMapExample
{
public static async Task Main(string[] args)
{
var options = new HazelcastOptionsBuilder()
.With(args)
.WithConsoleLogger()
.Build();

// create an Hazelcast client and connect to a enterprise server running on localhost
// note that that server should be properly configured for CP with at least 3 members
await using var client = await HazelcastClientFactory.StartNewClientAsync(options);

// Get a CPMap under "myGroup" Raft group.
var map = await client.CPSubsystem.GetMapAsync<int, string>("myMap@myGroup");

var (key, val) = (1, "my-value");

// Set a value
// Note: Set does not return back the old value that is associated with the key. If you require the previous value,
// consider using PutAsync.
await map.SetAsync(key, val);

// Get value that is map to the key.
// If key does not exist, the return value will be null. However, we know that the key-value pair exists, and
// ignore the possible null warning.
var currentVal = await map.GetAsync(key)!;

Console.WriteLine($"Key: {key}, Expected Value: {val}, Actual Value:{currentVal}");

// Let's change the value of the key by using CompareAndSetAsync
// The expected value will be compared to current value which is associated to given key.
// If they are equal, new value will be set.
var newValue = "my-new-value";
var isSet = await map.CompareAndSetAsync(key, currentVal, newValue);

Console.WriteLine($"Key: {key}, Expected Value: {currentVal}, New Value:{newValue}, Is set successfully done:{isSet}");


// Assume that we do not need the map anymore. So, it is better to destroy the map on the cluster and release the resources.
// Note that Hazelcast does NOT do garbage collection on CPMap.
await map.DestroyAsync();
}
}
189 changes: 189 additions & 0 deletions src/Hazelcast.Net.Tests/CP/CPMapTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Hazelcast.Core;
using Hazelcast.CP;
using Hazelcast.DistributedObjects;
using Hazelcast.Protocol;
using Hazelcast.Protocol.Models;
using Hazelcast.Testing;
using Hazelcast.Testing.Remote;
using NUnit.Framework;

namespace Hazelcast.Tests.CP;

[Category("enterprise")]
public class CPMapTests : MultiMembersRemoteTestBase
{
private string _defaultMapName = "myMap";
private string _groupName = "group1";

private List<Member> _members = new();
public IHazelcastClient Client { get; private set; }
protected override string RcClusterConfiguration => TestFiles.ReadAllText(this, "Cluster/cp.xml");


[OneTimeSetUp]
public async Task TestOneTimeSetUp()
{
// CP-subsystem wants at least 3 members
for (var i = 0; i < 3; i++) await AddMember().CfAwait();
Client = await CreateAndStartClientAsync().CfAwait();
}


[Test]
public async Task TestGetCPMapWithNullName()
{
Assert.ThrowsAsync<ArgumentException>(() => Client.CPSubsystem.GetMapAsync<int, int>(null));
}

[Test]
public async Task TestGetCPMap()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
Assert.NotNull(map);
Assert.IsInstanceOf<CPMap<int, int>>(map);
Assert.IsInstanceOf<ICPDistributedObject>(map);

var doBase = (CPDistributedObjectBase) map;
Assert.AreEqual(ServiceNames.CPMap, doBase.ServiceName);
Assert.AreEqual(_defaultMapName, doBase.Name);
Assert.IsNull(doBase.PartitionKey);

Assert.AreEqual(CPSubsystem.DefaultGroupName, map.GroupId.Name);
}

[Test]
public async Task TestPut()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
var (key, val) = (101, 1001);

// Attention: Neither key nor value can be null.

var prevVal = await map.PutAsync(key, val)!;
Assert.AreEqual(default(int), prevVal);
Assert.That(prevVal, Is.Zero);
prevVal = await map.PutAsync(key, val)!;
Assert.AreEqual(val, prevVal);
}

[Test]
public async Task TestSet()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
var (key, val) = (102, 1002);

// Attention: Neither key nor value can be null. It's ok to suppress.
await map.SetAsync(key, val);
var setVal = await map.GetAsync(key)!;
Assert.AreEqual(val, setVal);
}

[Test]
public async Task TestGet()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
var (key, val) = (103, 1003);

var setVal = await map.GetAsync(key)!;
Assert.AreEqual(default(int), setVal);

await map.SetAsync(key, val);

setVal = await map.GetAsync(key)!;
Assert.AreEqual(val, setVal);
}

[Test]
public async Task TestRemove()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
var (key, val) = (104, 1004);

var setVal = await map.RemoveAsync(key)!;
Assert.AreEqual(default(int), setVal);

await map.SetAsync(key, val);

setVal = await map.RemoveAsync(key)!;
Assert.AreEqual(val, setVal);
}

[Test]
public async Task TestDelete()
{
var map = await Client.CPSubsystem.GetMapAsync<int, int>(_defaultMapName);
var (key, val) = (105, 1005);

// Nothing happens.
await map.DeleteAsync(key);

await map.SetAsync(key, val);
await map.DeleteAsync(key);
var getVal = await map.GetAsync(key)!;
Assert.AreEqual(default(int), getVal);
}

[Test]
public async Task TestCompareAndSet()
{
var map = await Client.CPSubsystem.GetMapAsync<string, string>("stringMap");

var key = "myKey";
var val1 = "val1";
var val2 = "val2";

await map.SetAsync(key, val1);

var isSet = await map.CompareAndSetAsync(key, "randomKey", "randomValue");
Assert.False(isSet);
var actual = await map.GetAsync(key)!;
Assert.AreEqual(val1, actual);

isSet = await map.CompareAndSetAsync(key, val1, val2);
Assert.True(isSet);
actual = await map.GetAsync(key)!;
Assert.AreEqual(val2, actual);
}

[Test]
public async Task TestTryUseAfterDestroy()
{
var mapName = "myMap1";
var map = await Client.CPSubsystem.GetMapAsync<int, int>(mapName);
await map.DestroyAsync();
var ex = Assert.ThrowsAsync<RemoteException>(() => map.SetAsync(1, 1));
Assert.AreEqual(RemoteError.DistributedObjectDestroyed, ex!.Error);
}

[Test]
public async Task TestTryCreateAfterDestroy()
{
var mapName = "myMap2";
var map = await Client.CPSubsystem.GetMapAsync<int, int>(mapName);
await map.DestroyAsync();
var ex = Assert.ThrowsAsync<RemoteException>(async () =>
{
var mapNew = await Client.CPSubsystem.GetMapAsync<int, int>(mapName);
await mapNew.SetAsync(1, 1);
});

Assert.AreEqual(RemoteError.DistributedObjectDestroyed, ex!.Error);
}
}
2 changes: 1 addition & 1 deletion src/Hazelcast.Net.Tests/Clustering/FailoverTests2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private async ValueTask ServerHandler(ClientRequest<ServerState> request)
var authRequest = ClientAuthenticationServerCodec.DecodeRequest(request.Message);
var authResponse = ClientAuthenticationServerCodec.EncodeResponse(
0, address, memberId, SerializationService.SerializerVersion,
"4.0", partitionsCount, request.Server.ClusterId, true);
"4.0", partitionsCount, request.Server.ClusterId, true,Array.Empty<int>(), Array.Empty<byte>());
await request.RespondAsync(authResponse).CfAwait();
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Hazelcast.Net.Tests/Clustering/MemberConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ private async ValueTask ServerHandler(ClientRequest<ServerState> request)
var authRequest = ClientAuthenticationServerCodec.DecodeRequest(request.Message);
var authResponse = ClientAuthenticationServerCodec.EncodeResponse(
0, request.State.Address, request.State.MemberId, SerializationService.SerializerVersion,
"4.0", partitionsCount, request.Server.ClusterId, false);
"4.0", partitionsCount, request.Server.ClusterId, false,
Array.Empty<int>(), Array.Empty<byte>());
await request.RespondAsync(authResponse).CfAwait();
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Core;
Expand Down Expand Up @@ -192,7 +193,8 @@ private async ValueTask ServerHandler(ClientRequest<ServerState> request)
var authRequest = ClientAuthenticationServerCodec.DecodeRequest(request.Message);
var authResponse = ClientAuthenticationServerCodec.EncodeResponse(
0, request.State.Address, request.State.MemberId, SerializationService.SerializerVersion,
"4.0", partitionsCount, request.Server.ClusterId, false);
"4.0", partitionsCount, request.Server.ClusterId, false,
Enumerable.Empty<int>().ToList(),Array.Empty<byte>());
await request.RespondAsync(authResponse).CfAwait();
break;
}
Expand Down
6 changes: 4 additions & 2 deletions src/Hazelcast.Net.Tests/Networking/NetworkingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private async Task HandleAsync(Server server, ClientMessageConnection connection
var request = ClientAuthenticationServerCodec.DecodeRequest(requestMessage);
var responseMessage = ClientAuthenticationServerCodec.EncodeResponse(
0, server.Address, server.MemberId, SerializationService.SerializerVersion,
"4.0", 1, server.ClusterId, false);
"4.0", 1, server.ClusterId, false,
Enumerable.Empty<int>().ToList(),Array.Empty<byte>());
await connection.SendResponseAsync(requestMessage, responseMessage).CfAwait();
break;
}
Expand Down Expand Up @@ -272,7 +273,8 @@ async Task EventAsync(ClientMessage eventMessage)
var authRequest = ClientAuthenticationServerCodec.DecodeRequest(msg);
var authResponse = ClientAuthenticationServerCodec.EncodeResponse(
0, address, Guid.NewGuid(), SerializationService.SerializerVersion,
"4.0", 1, Guid.NewGuid(), false);
"4.0", 1, Guid.NewGuid(), false,
Enumerable.Empty<int>().ToList(),Array.Empty<byte>());
await ResponseAsync(authResponse).CfAwait();
break;

Expand Down
Loading