Skip to content

integration redis pubsub

Brian Greco edited this page Mar 22, 2023 · 2 revisions

IMAGERedis: PubSub

Publishing a domain-event in-process, to RabbitMQ, Azure Service Bus, or to Redis is exactly the same from the perspective of the calling code. A domain-event may start out being delivered to subscribers using RabbitMQ but can be easily changed or extended to use Redis without having to change any of the calling code. Additionally, the same domain-event can be delivered using multiple implementation technologies.


IMAGEUnlike the RabbitMQ and Azure Service Bus plugins that support several different message patterns, Redis only supports publishing a domain-event to all subscribing microservices. RabbitMQ and Azure Service Bus mainly support publisher/consumer where a published message is sent to one of the subscribing microserivces. However, the same can be achieved within RabbitMQ and Azure ServiceBus by having each microservice create a dedicated queue bound to the exchange/topic.


Define Router

Similar to RabbitMQ and Azure Service Bus Plugins, information about how the domain-event is published and subscribed are defined using a router.

Add the following class to the example microservice: src/Components/Examples.Redis.Infra/Routers

using NetFusion.Integration.Redis;

namespace Examples.Redis.Infra.Routers;

public class RedisBusRouter : RedisRouter
{
    public RedisBusRouter() : base("testDb")
    {
    }

    protected override void OnDefineEntities()
    {
     
    }
}

Within the OnDefineEntities base method override, is where the routes are added specifying how a domain-event is delivered and routed to a local method handler. The string "testDb", passed to the base constructor, specifies the name of the Redis database connection specified within the appsettings.json configuration file.

Define Domain-Event

Define the domain-event within the following directory: src/Components/Examples.Redis.Domain/Events

using NetFusion.Messaging.Types;

namespace Examples.Redis.Domain.Events;

public class LogEntryCreated : DomainEvent
{
    public string LogLevel { get; }
    public int Severity { get; }
    public string Message { get; }

    public LogEntryCreated(string logLevel, int severity, string message)
    {
        LogLevel = logLevel;
        Severity = severity;
        Message = message;
    }
}

IMAGEWhile the publisher and subscriber are usually two different microservices, these examples will only use a single microservice solution acting as both the publisher and consumer. This allows the examples to focus on a single solution and only one process must be executed. However, in a production environment, the publisher and consumer would be separate microservices. The example code is separated into Publishing and Subscribing sections indicating which microservice the code would be placed.


Publishing Microservice

The publishing microservice will specify when a LogEntryCreated domain-event is published that it should be delivered to a channel named log-entries. Add the following code to the OnDefineEntities method:

DefineChannel<LogEntryCreated> (channel =>
{
    channel.ChannelName = "log-entries";
    channel.AppliesWhen(log => 2 < log.Severity);
    channel.SetEventData(log => $"{log.LogLevel}.{log.Severity}");
});

If the SetEventData method is called as in the above example, the returned string based on the domain-event's data will be appended to the channel name. In the above example, "log-entries.error.5" would be the channel name for a domain-event with a log level of "error" having a severity of "5". Optional is the call to the AppliesWhen method specifying a predicate indicating if the domain-event should be published to the channel.

Subscribing Microservice

Add the following handler class that will be bound to the channel within: Examples.Redis.App/Handlers

using System;
using Examples.Redis.Domain.Events;
using NetFusion.Common.Extensions;

namespace Examples.Redis.App.Handlers;

public class LogEntryHandler
{
    public void OnLogCreated(LogEntryCreated domainEvent)
    {
        Console.WriteLine($"{nameof(LogEntryHandler)}:{nameof(OnLogCreated)} Called");
        Console.WriteLine(domainEvent.ToIndentedJson());
    }
}

The subscribing microservice defines a route specifying the local method to which a domain-event should be dispatched when received on a channel. The specified channel name of:

log-entries.*.*

would receive all domain-events published to the channel. Likewise, the following channel name pattern:

log-entries.*.3

would receive all domain-events of severity 3 for all log levels.

Add the following code to the OnDefineEntities method:

SubscribeToChannel<LogEntryCreated>("log-entries.*.3", route =>
{
    route.ToConsumer<LogEntryHandler>(c => c.OnLogCreated);
});

Define Api Model

Define the following class within the Examples.Redis.WebApi/Models directory:

using System.ComponentModel.DataAnnotations;

namespace Examples.Redis.WebApi.Models;

public class LogEntryModel
{
    [Required] 
    public string LogLevel { get; set; } = string.Empty;
    public int Severity { get; set; }
    public string? Message { get; set; }
}

Define Api Controller

Add the following method to the ExamplesController within the Examples.Redis.WebApi/Controllers directory:

namespace Examples.Redis.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExamplesController : ControllerBase
{
    private readonly IMessagingService _messaging;
    
    public ExamplesController(
        IMessagingService messaging)
    {
        _messaging = messaging;
    }

    [HttpPost("log/entries")]
    public async Task<IActionResult> PublishLogEntry([FromBody]LogEntryModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var domainEvent = new LogEntryCreated(model.LogLevel, model.Severity) { Message = model.Message };
        await _messaging.PublishAsync(domainEvent);

        return Ok();
    }
}

Execute Example

Complete the following to run the example microservice and send requests to the example controller:

Execute Microservice

cd ./Examples.Redis/src/Examples.Redis.WebApi/
dotnet run

Send Requests

Post the following requests to publish domain-events to the channel:

http://localhost:5005/api/examples/log/entries

{
    "logLevel": "Error",
    "severity": 3,
    "message": "Invalid input"
}

IMAGE

The following will be printed to the console:

IMAGE

{
    "logLevel": "Warning",
    "severity": 3,
    "message": "Invalid input"
}

IMAGE

IMAGE

Clone this wiki locally