Skip to content
Denys Zaliznyak edited this page Mar 7, 2023 · 2 revisions

How to use MetaPubSub

Hub creation

// hub creation
var hub = new MetaPubSub();

// subscribing to MyMessage
hub.Subscribe<MyMessage>(OnMyMessage);

// publishing a message
await hub.Publish(new MyMessage());

// unsubscribing
hub.Unsubscribe<MyMessage>(OnMyMessage);

Exceptions handling

var hub = new MetaPubSub();
var options = new PubSubOptions() { DeliverAtLeastOnce = true, WaitForSubscriberTimeout = 0 };
var message = new MyMessage();

try
{
    // publishing a message when no one subscribed - NoSubscribersException
    await hub.Publish(message, options);
}
catch (NoSubscribersException ex)
{
    // No one is subscribed to this message and (DeliverAtLeastOnce == true and WaitForSubscriberTimeout == 0)
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}


try
{
    // publishing a message when no one subscribed and WaitForSubscriberTimeout > 0 - TimeoutException
    options.WaitForSubscriberTimeout = 100;
    await hub.Publish(message, options);
}
catch (TimeoutException ex)
{
    // No one is subscribed to this message and (DeliverAtLeastOnce == true and WaitForSubscriberTimeout > 0)
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}


try
{
    hub.Subscribe<MyMessage>(OnMyMessageHandlerWithException);

    // publishing a message
    await hub.Publish(message, options);
}
catch (AggregateException ex)
{
    // When a message is processed by subscribers, any exceptions that are raised
    // can be caught by the publisher as an AggregateException.
    // Even if some subscribers throw an exception, the others
    // will still continue to process the message.
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
    foreach (var innerEx in ex.InnerExceptions)
    {
        Console.WriteLine($"\tInner Exception {innerEx.GetType()}: {innerEx.Message}");
    }
}

hub.Unsubscribe<MyMessage>(OnMyMessageHandlerWithException);

At least once delivery check

var hub = new MetaPubSub();

// if this not set, NoSubscribersException will not be thrown
var options = new PubSubOptions() { DeliverAtLeastOnce = true };

var message = new MyMessage();

try
{
    // publishing a message when no one is subscribed
    await hub.Publish(message, options);
}
catch (NoSubscribersException ex)
{
    // no one is listening
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}

hub.Subscribe<MyMessage>(OnMyMessage);
await hub.Publish(message, options);
hub.Unsubscribe<MyMessage>(OnMyMessage);

Message filtering

You can define a predicate to subscribe only those messages you want to process.

var hub = new MetaPubSub();

// subscribing to MyMessage with a predicate
hub.Subscribe<MyMessage>(OnMyMessage, m => m.Version > new Version(1, 0));

// this message will be filtered and not handled
var message1 = new MyMessage
{
    Version = new Version(1, 0)
};
await hub.Publish(message1);

// this message will be handled
var message2 = new MyMessage
{
    Version = new Version(1, 1)
};

await hub.Publish(message2);

Timeout to wait for a subscriber

Your message can be added to a queue and will wait there until someone subscribes to it and processes it.

var hub = new MetaPubSub();

// a subscriber that will subscribe after the message has been published 
var t = Task.Run(async () =>
{
    await Task.Delay(1500);
    Console.WriteLine($"Subscribed to MyMessage at {DateTime.Now:HH:mm:ss.fff}");
    hub.Subscribe<MyMessage>(OnMyMessage);
});

Console.WriteLine($"Start publishing and awaiting at {DateTime.Now:HH:mm:ss.fff}");
// this method will wait until the subscriber receives the message or until timeout expired (10 seconds)
await hub.Publish(new MyMessage(), new PubSubOptions() { DeliverAtLeastOnce = true, WaitForSubscriberTimeout = 10_000 });
Console.WriteLine($"End awaiting at {DateTime.Now:HH:mm:ss.fff}");

hub.Unsubscribe<MyMessage>(OnMyMessage);

Scheduling a message

Your message can be queued and published after a time delay.

var hub = new MetaPubSub();

hub.Subscribe<MyMessage>(OnMyMessage);

// The message will be published after 3 seconds delay and after that, it can wait another 500 ms for a subscriber.
// When using Schedule method there is no way to receive NoSubscriberException or AggregateException.
hub.Schedule(new MyMessage(), millisecondsDelay: 3000, new PubSubOptions() { DeliverAtLeastOnce = true, WaitForSubscriberTimeout = 1500 });
Console.WriteLine($"Message scheduled at {DateTime.Now:HH:mm:ss.fff}, delay - 3 sec");

// waiting before unsubscribing
await Task.Delay(3500);
hub.Unsubscribe<MyMessage>(OnMyMessage);

Asynchronous waiting for a specified message

var hub = new MetaPubSub();

// publishing a MyEvent with 500 ms delay
var t = Task.Run(async () =>
{
    await Task.Delay(500);
    await hub.Publish(new MyEvent());
});

try
{
    // This method will wait for MyEvent one second.
    // If the event will not arrive in a specified time period the TimeoutException will be thrown.
    MyEvent res = await hub.When<MyEvent>(millisecondsTimeout: 1000);
    Console.WriteLine($"Received MyEvent at {DateTime.Now:HH:mm:ss.fff}");
}
catch (TimeoutException ex)
{
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}

Request-response pattern

Send a message and wait for the response as a single awaitable method, without need to Subscribe/Unsubscribe to the response message.

var hub = new MetaPubSub();

// This handler should be placed somewhere in another module.
// It processes MyMessage and publishes a MyEvent as a result. 
Task Handler(MyMessage x)
{
    hub.Publish(new MyEvent());
    return Task.CompletedTask;
}
hub.Subscribe<MyMessage>(Handler);

try
{
    // This method will publish MyMessage and wait for MyEvent one second.
    // If the event will not arrive in a specified timeout the TimeoutException will be thrown.
    var message = new MyMessage();
    MyEvent res = await hub.Process<MyMessage, MyEvent>(message, responseTimeoutMs: 1000, new PubSubOptions() { WaitForSubscriberTimeout = 100 });
    Console.WriteLine($"Received MyEvent at {DateTime.Now:HH:mm:ss.fff}");
}
catch (NoSubscribersException ex)
{
    // no one is listening
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (TimeoutException ex)
{
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}

Cancellation token support

You can cancel scheduling or waiting for the message.

var hub = new MetaPubSub();
var cts = new CancellationTokenSource();

// publish an event after 100 ms
var t = Task.Run(async () =>
{
    await Task.Delay(100);
    await hub.Publish(new MyEvent());
});

// cancel waiting after 50 ms
var t2 = Task.Run(async () =>
{
    await Task.Delay(50);
    cts.Cancel();
});


try
{
    var res = await hub.When<MyEvent>(millisecondsTimeout: 200, match: null, cts.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Waiting for MyEvent has been canceled");
}

Client-server mode

You can start one hub as a server and connect to it several clients hubs from different processes and even different computers. Interprocess communication made on named pipes. If client disconnected from the server for some reason it tries to reconnect automatically.

int count = 0;
Task Handler(MyMessage x)
{
    count++;
    return Task.CompletedTask;
}

// Creating the server hub.
// The server and the client hubs should be created in separate processes, 
// this example is for demo only.
var serverHub = new MetaPubSub();
using var pipeServer = new PubSubPipeServer("Meta", serverHub);

// Starting the hub as a server named 'Meta'.
pipeServer.StartServer();

// Client hub creation. There are can be several hubs connected to the same server.
var clientHub = new MetaPubSub();
using var pipeClient = new PubSubPipeClient("Meta", clientHub);

// Connecting to the remote server.
await pipeClient.ConnectToServer();

// Subscribing to MyMessage on the server and locally
await pipeClient.SubscribeOnServer<MyMessage>();
clientHub.Subscribe<MyMessage>(Handler);

// The server publishes a message.
await serverHub.Publish(new MyMessage());

// Client hub publishes a message and it will be received locally without being sent to the server.
await clientHub.Publish(new MyMessage());

// Client hub sends a message to the server where it will be published and sent back.
await pipeClient.PublishOnServer(new MyMessage());

// All three messages should be received.
Debug.Assert(count == 3);

// Unsubscribing both on the server-side and locally.
await pipeClient.UnsubscribeOnServer<MyMessage>();
clientHub.Unsubscribe<MyMessage>(Handler);