Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5810a68
Initial impl.
emreyigit May 3, 2023
32c732f
Implemented the part of message executor.
emreyigit May 5, 2023
fa38d8a
Testing
emreyigit May 8, 2023
8d88953
Test added.
emreyigit May 11, 2023
8190996
Improved the implementation, added diposed and exception events.
emreyigit Jun 1, 2023
26833b9
Blocking pulish improved.
emreyigit Jun 2, 2023
68493cc
Merge branch 'master' into reliable-topic
emreyigit Jun 5, 2023
8fbedee
Add keep cluster name to RC.
emreyigit Jun 6, 2023
1b3dbdf
Add member config for reliable topic.
emreyigit Jun 6, 2023
d256bf0
Add store sequence option.
emreyigit Jun 6, 2023
dfc1ee0
Add store sequence option(fix)
emreyigit Jun 6, 2023
b844d3b
Add cluser restart tests.
emreyigit Jun 6, 2023
1cc8224
Merge branch 'reliable-topic' of https://github.com/emreyigit/hazelca…
emreyigit Jun 6, 2023
892a25e
Merge branch 'master' into reliable-topic
emreyigit Jun 6, 2023
dc0b5e6
Add more tests
emreyigit Jun 7, 2023
15990de
Add docs, example and tests.
emreyigit Jun 8, 2023
5f4e285
Don't kill members for each test, save time.
emreyigit Jun 9, 2023
2c2e0e0
Let listener be ready before test.
emreyigit Jun 9, 2023
379b68a
Review changes.
emreyigit Jun 12, 2023
010cc15
Merge branch 'master' into reliable-topic
emreyigit Jun 13, 2023
dc04a2a
Review changes.
emreyigit Jun 13, 2023
01fa141
Improve tests.
emreyigit Jun 13, 2023
03202f5
Optimize tests
emreyigit Jun 13, 2023
b326b5a
Optimize tests 2.
emreyigit Jun 13, 2023
d606f1d
Optimize tests 3.
emreyigit Jun 13, 2023
4f46ea4
Optimize Tests 4.
emreyigit Jun 13, 2023
d728d00
Merge branch 'master' into reliable-topic
emreyigit Jun 15, 2023
29bfc61
Review changes.
emreyigit Jun 15, 2023
b2aafb0
Review changes.
emreyigit Jun 15, 2023
a14fa7f
Fix documentation.
emreyigit Jun 15, 2023
af59734
Review changes.
emreyigit Jun 16, 2023
8c5b3cb
Review changes.
emreyigit Jun 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions doc/dev/doc/distributed-objects/hreliabletopic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# HReliableTopic

A `HReliableTopic` topic is the durable version of [HTopic](/htopic.md) backed with a [HRingBuffer](/hringbuffer.md).

The reliable topic behavior can be configured on the server: see the general [Documentation](https://docs.hazelcast.com/imdg/latest/data-structures/reliable-topic) for complete details about reliable topics.

## Defining Topics

Topics are fully identified by their type and unique name, regardless of the types specified for topic messages. In other words, an `HReliableTopic<string>` and an `HReliableTopic<int>` named with the same name are backed by the *same* cluster structure which is a [RingBuffer](/hringbuffer.md). Obviously, refering to a topic with types other than the expected types can have unspecified consequences (probably, serialization errors) and is not recommended.

The messages type can be just about any valid .NET type, provided that it can be (de)serialized by the Hazelcast .NET Client (see the [Serialization](../serialization.md) documentation). It does not necessarily need to be (de)serializable by the cluster, as long as the cluster does not need to handle them as objects, and can treat them as plain binary blobs. As soon as the cluster needs to handle the objects themselves, the types must also be (de)serializable by the cluster.

## Creating & Destroying Topics

A reliable topic is obtained from the Hazelcast .NET Client, and is created on-demand: if a reliable topic with the specified name already exists on the cluster, it is returned, otherwise it is created on the cluster. For instance:

```csharp
var rTopic = await client.GetReliableTopicAsync<string>("my-reliableTopic");
```

## Configuring the HReliableTopic

There are three different parts can be configured. One is server side configuration, size of the backed ring buffer, TTL, overflow policy etc. Second is the reliable topic behavior on the client side, such as `ReliableTopicOptions.BatchSize` and `ReliableTopicOptions.Policy`. The batch size sets the number of messages read by the listener at once. And, overflow policy defines the behavior during publishing a message over `HReliableTopic`. Third one is for listener. The subscription is made to a `HReliableTopic` results in a listener. Some of the behaviors of the listener can be configured. For example, Los tolerancy, initial sequnce to start from, storing the sequence of the last read message and whether terminate the listener in case of an exception.

> [!NOTE]
> To have a durable listener, set `IsLossTolerant` to `false` and `StoreSequence` to `true`.

Topics should be disposed after usage, in order to release their resources. Note that this only releases *client-side* resources, but the actual data remain available on the cluster for further usage. In order to wipe the topic and its data entirely from the cluster, it needs to be destroyed:

```csharp
await rTopic.DestroyAsync();
```

## Using Reliable Topics

The `HReliableTopic` structure is completely documented in the associated @Hazelcast.DistributedObjects.ReliableTopic`1 reference documentation. It provides a method to publish messages:

* `PublishAsync(message)` publishes a message

The `HReliableTopic` structure exposes events in a way similar to `HTopic`, but with some additions. When a subscription is made to a `HReliableTopic`, a background task is spawned. It listens to messages from the backing `HRingBuffer`, and triggers the corresponding `Message` topic events.

In addition,
* The `Exception` event is raised when a message can not be processed, either because it can not be deserialized, or because an exception is thrown by one of the `Message` event handlers and interrupts the handling of the message. In this situation, by default, the subscription terminates, because it is not supposed to "skip" messages. It is however possible to cancel that termination by setting the `Cancel` event arguments property to `true`, in which case the subscription will move on to the next messages.

* The `Terminated` event is raised when the subscription terminates, either because of a non-canceled exception (see above), or when anything goes wrong with the underlying buffer (overload, loss...), or when it is actively terminated by e.g. disposing the reliable topic instance. Finally, the behavior of the subscription can be configured via the `ReliableTopicEventHandlerOptions`.



```csharp
var id = await topic.SubscribeAsync(events => events
.Message((sender, args) => {
logger.LogInformation($"Got message {args.Payload} at {args.PublishTime}.");
})
.Terminated((sender, args) =>{
logger.LogInformation($"Listener disposed at sequence {args.Sequence}.");
}),
.Exception((sender, args) =>
{
// Terminate the subscription if client goes offline.
if (args.Exception is ClientOfflineException)
args.Cancel = true;
}),
// Setting StoreSequence=true and IsLossTolerant=false means listener is durable.
new ReliableTopicEventHandlerOptions() {InitialSequence = -1, StoreSequence = true, IsLossTolerant = false});

// ...

await rTopic.UnsubscribeAsync(id);
```

Note that, as with all events in the .NET client, the handler methods passed when subscribing can be asynchronous.
5 changes: 2 additions & 3 deletions doc/dev/doc/distributed-objects/htopic.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A `HTopic` topic is a distributed topic corresponding to a cluster-side [List](https://docs.hazelcast.com/imdg/latest/data-structures/topic.html).

The topic behavior can be configured on the server: see the general [List documentation](https://docs.hazelcast.com/imdg/latest/data-structures/optic.html) for complete details about topics.
The topic behavior can be configured on the server: see the general [Documentation](https://docs.hazelcast.com/imdg/latest/data-structures/topic.html) for complete details about topics.

## Defining Topics

Expand Down Expand Up @@ -43,5 +43,4 @@ var id = await topic.SubscribeAsync(events => events
await topic.UnsubscribeAsync(id);
```

Note that the handler methods passed to e.g. `Message` can be asynchronous, too.

Note that, as with all events in the .NET client, the handler methods passed when subscribing can be asynchronous.
1 change: 1 addition & 0 deletions doc/dev/doc/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
## [HRingBuffer](distributed-objects/hringbuffer.md)
## [HSet](distributed-objects/hset.md)
## [HTopic](distributed-objects/htopic.md)
## [HReliableTopic](distributed-objects/hreliabletopic.md)
## [AtomicLong](distributed-objects/atomiclong.md)
## [AtomicRef](distributed-objects/atomicref.md)
## [FlakeIdGenerator](distributed-objects/flakeidgenerator.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Exceptions;
using Hazelcast.Models;

namespace Hazelcast.Examples.DistributedObjects
{
// ReSharper disable once UnusedMember.Global
public class ReliableTopicExample
{
public static async Task Main(string[] args)
{
var options = new HazelcastOptionsBuilder()
.With(args)
.With((conf, opt) =>
{
// Publish messages when there is a space and read messages as batches size of 5.
opt.ReliableTopics["reliable-topic-example"] = new ReliableTopicOptions(TopicOverloadPolicy.Block, 5);
})
.WithConsoleLogger()
.Build();

// create an Hazelcast client and connect to a server running on localhost
await using var client = await HazelcastClientFactory.StartNewClientAsync(options);

// get distributed reliable topic from cluster
await using var topic = await client.GetReliableTopicAsync<string>("reliable-topic-example");

var count = 100;
var counted = new SemaphoreSlim(0);

// subscribe to event
await topic.SubscribeAsync(on => on
.Message((sender, args) =>
{
Console.WriteLine($"Got message {args.Payload}");
if (Interlocked.Decrement(ref count) == 0)
counted.Release();
})
.Terminated((sender, args) =>
{
Console.WriteLine("The listener is disposed, and the task at the background is canceled.");

}).Exception((sender, args) =>
{
// Terminate the subscription if client goes offline.
if (args.Exception is ClientOfflineException)
args.Cancel = true;
}),
// Setting StoreSequence=true and IsLossTolerant=false means listener is durable.
new ReliableTopicEventHandlerOptions() {InitialSequence = -1, StoreSequence = true, IsLossTolerant = false});

// publish messages
for (var i = 0; i < 100; i++)
{
await topic.PublishAsync($"Message {i}");
}

// wait for all events
await counted.WaitAsync();

// destroy the topic
await client.DestroyAsync(topic);
}
}
}
20 changes: 14 additions & 6 deletions src/Hazelcast.Net.Testing/ClusterRemoteTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Threading.Tasks;
using Hazelcast.Clustering;
using Hazelcast.Core;
using Hazelcast.Testing.Conditions;
using Hazelcast.Testing.Remote;
using NUnit.Framework;

Expand All @@ -26,15 +27,18 @@ namespace Hazelcast.Testing
public abstract class ClusterRemoteTestBase : RemoteTestBase
{
[OneTimeSetUp]
public async Task ClusterOneTimeSetUp()
public virtual async Task ClusterOneTimeSetUp()
{
// create remote client and cluster
RcClient = await ConnectToRemoteControllerAsync().CfAwait();
RcCluster = await RcClient.CreateClusterAsync(RcClusterConfiguration).CfAwait();
RcCluster = KeepClusterName
? await RcClient.CreateClusterKeepClusterNameAsync(ServerVersion.DefaultVersion.Version.ToString(), RcClusterConfiguration)
: await RcClient.CreateClusterAsync(RcClusterConfiguration).CfAwait();
}


[OneTimeTearDown]
public async Task ClusterOneTimeTearDown()
public virtual async Task ClusterOneTimeTearDown()
{
// terminate & remove client and cluster
if (RcClient != null)
Expand Down Expand Up @@ -62,12 +66,16 @@ protected override HazelcastOptions CreateHazelcastOptions()
/// <summary>
/// Gets the remote controller client.
/// </summary>
protected IRemoteControllerClient RcClient { get; private set; }
protected IRemoteControllerClient RcClient { get; set; }

/// <summary>
/// Gets the remote controller cluster.
/// </summary>
protected Remote.Cluster RcCluster { get; private set; }

protected Remote.Cluster RcCluster { get; set; }

/// <summary>
/// Whether creates the cluster with name provided in the config.
/// </summary>
protected virtual bool KeepClusterName { get; set; }
}
}
9 changes: 9 additions & 0 deletions src/Hazelcast.Net.Testing/Remote/IRemoteControllerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public interface IRemoteControllerClient
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The new cluster.</returns>
Task<Cluster> CreateClusterAsync(string serverVersion, string serverConfiguration, CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new cluster with name in the provided configuration.
/// </summary>
/// <param name="serverVersion">The Hazelcast server version.</param>
/// <param name="serverConfiguration">The server Xml configuration.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns></returns>
Task<Cluster> CreateClusterKeepClusterNameAsync(string serverVersion, string serverConfiguration, CancellationToken cancellationToken = default);

/// <summary>
/// Starts a new member.
Expand Down
4 changes: 4 additions & 0 deletions src/Hazelcast.Net.Testing/Remote/RemoteControllerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public async Task<bool> ExitAsync(CancellationToken cancellationToken = default)
public Task<Cluster> CreateClusterAsync(string hzVersion, string xmlconfig, CancellationToken cancellationToken = default)
=> WithLock(token => createCluster(hzVersion, xmlconfig, token), cancellationToken);

/// <inheritdoc />
public Task<Cluster> CreateClusterKeepClusterNameAsync(string serverVersion, string serverConfiguration, CancellationToken cancellationToken = default)
=> WithLock(token => createClusterKeepClusterName(serverVersion, serverConfiguration, token), cancellationToken);

/// <inheritdoc />
public Task<Member> StartMemberAsync(string clusterId, CancellationToken cancellationToken = default)
=> WithLock(token => startMember(clusterId, token), cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/Hazelcast.Net.Testing/SingleMemberRemoteTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public async Task MemberOneTimeTearDown()
/// <summary>
/// Gets the remote member.
/// </summary>
protected Remote.Member RcMember { get; private set; }
protected Remote.Member RcMember { get; set; }
}
}
Loading