# CosmosDB change feed publishing to Azure Service Bus processed with MediatR

This sample shows how to use the CosmosDB change feed with an Azure Function App (dotnet-isolated) worker to publish the events to Azure Service Bus and then receive them and process them via MediatR to create a reliable event-driven system.

In [None]:
#!pwsh
Remove-Item -Path ~\.nuget\packages\eventinatr* -Recurse -Force
dotnet build ..\..\EventinatR.sln --no-incremental
dotnet pack ..\..\EventinatR.sln --output C:\Windows\Temp\EventinatR

In [None]:
#i "nuget:C:\Windows\Temp\EventinatR"
#r "nuget:EventinatR"
#r "nuget:EventinatR.Cosmos"
#r "nuget:EventinatR.DependencyInjection"
#r "nuget:EventinatR.Publishers.ServiceBus"
#r "nuget:Azure.Messaging.ServiceBus"
#r "nuget:MediatR"
#r "nuget:MediatR.Extensions.Microsoft.DependencyInjection"
#r "nuget:Microsoft.Extensions.Caching.Memory"
#r "nuget:Microsoft.Azure.Cosmos"
#r "nuget:Microsoft.Azure.Functions.Worker"
#r "nuget:Microsoft.Azure.Functions.Worker.Sdk"
#r "nuget:Microsoft.Azure.Functions.Worker.Extensions.CosmosDB"
#r "nuget:Microsoft.Azure.Functions.Worker.Extensions.Http"
#r "nuget:Microsoft.Azure.Functions.Worker.Extensions.ServiceBus"
#r "nuget:System.Memory.Data"
#r "nuget:System.Linq.Async"

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net;
using System.Text.Json;
using System.Threading;
using EventinatR;
using EventinatR.Cosmos;
using EventinatR.Publishers.ServiceBus;
using Azure.Messaging.ServiceBus;
using MediatR;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;

# Domain model

We are modeling a simple group with members, with the ability to add and remove members and the events that change the state of the group.

In [None]:
public record GroupMember(string Name);
public record GroupId(string Name);

public abstract record Event : INotification;

public abstract record GroupEvent : Event
{
    public record Created(GroupId Id) : GroupEvent;
    public record AddedMember(GroupId Id, GroupMember Member) : GroupEvent;
    public record RemovedMember(GroupId Id, GroupMember Member) : GroupEvent;
}

public record GroupState(GroupId Id, IEnumerable<GroupMember> Members);

public class Group
{
    public GroupId Id { get; private set; }
    public IEnumerable<GroupMember> Members => _members.AsEnumerable();
    public ICollection<GroupEvent> UncommittedEvents { get; } = new List<GroupEvent>();
    public GroupState State => new GroupState(Id, Members);

    private readonly List<GroupMember> _members = new();

    public Group(string name)
    {
        AddEvent(new GroupEvent.Created(new GroupId(name)));
    }

    public Group(IEnumerable<GroupEvent> events, GroupState state = null)
    {
        ArgumentNullException.ThrowIfNull(events);

        if (!events.Any() && state is null)
        {
            throw new ArgumentException("At least one event or state must be provided.");
        }

        if (state is not null)
        {
            Id = state.Id;
            _members.AddRange(state.Members);
        }

        foreach (var e in events)
        {
            ApplyEvent(e);
        }
    }

    private void AddEvent(GroupEvent e)
    {
        ApplyEvent(e);
        UncommittedEvents.Add(e);
    }

    private void ApplyEvent(GroupEvent e)
    {
        switch (e)
        {
            case GroupEvent.Created created:
                Apply(created);
                break;
            case GroupEvent.AddedMember addedMember:
                Apply(addedMember);
                break;
            case GroupEvent.RemovedMember removedMember:
                Apply(removedMember);
                break;
            default:
                throw new InvalidOperationException($"Unsupported event: {e.GetType().FullName}");
        }
    }

    private void Apply(GroupEvent.Created e)
        => Id = e.Id;

    public void AddMember(string name)
    {
        if (!_members.Any(x => x.Name == name))
        {
            var member = new GroupMember(name);
            var e = new GroupEvent.AddedMember(Id, member);
            AddEvent(e);
        }
    }

    private void Apply(GroupEvent.AddedMember e)
        => _members.Add(e.Member);

    public void RemoveMember(string name)
    {
        var member = _members.FirstOrDefault(x => x.Name == name);

        if (member is not null)
        {
            var e = new GroupEvent.RemovedMember(Id, member);
            AddEvent(e);
        }
    }

    private void Apply(GroupEvent.RemovedMember e)
        => _members.Remove(e.Member);
}

public class GroupRepository
{
    private readonly EventStore _store;

    public GroupRepository(EventStore store)
        => _store = store;

    private Task<EventStream> GetStreamAsync(GroupId id)
        => _store.GetStreamAsync($"{nameof(Group)}:{id.Name}".ToLowerInvariant());

    public async Task<Group?> GetAsync(string id)
    {
        var stream = await GetStreamAsync(new GroupId(id));
        var snapshot = await stream.ReadSnapshotAsync<GroupState>();
        var state = snapshot.State;
        var events = await snapshot.ReadAsync().Select(e =>
            {
                if (!e.TryConvert<GroupEvent>(out var groupEvent))
                {
                    throw new InvalidOperationException($"The event stream contains data that is not supported: {e.Data}");
                }

                return groupEvent;
            }).ToListAsync();

        return new Group(events, state);
    }

    public async Task<EventStreamVersion> SaveAsync(Group group)
    {
        ArgumentNullException.ThrowIfNull(group);

        if (group.UncommittedEvents.Any())
        {
            var stream = await GetStreamAsync(group.Id);
            var state = new GroupState(group.Id, group.Members);
            var version = await stream.AppendAsync(group.UncommittedEvents, state);
            group.UncommittedEvents.Clear();
            return version;
        }

        return EventStreamVersion.None;
    }
}

# Projections

The CosmosDB change feed will be used to generate our projections from the event stream. MediatR is used to distribute the events to the appropriate handlers. This projection is an example, it is not recommended to use the snapshot capability of the event stream to store state (e.g. I was lazy for the example, be better than me, use EFCore or something else).

In [None]:
public class GroupsProjectionDistributedCache
{
    private record State(IEnumerable<GroupId> Groups);

    private const string CacheKey = "groups";
    private readonly IDistributedCache _cache;

    public GroupsProjectionDistributedCache(IDistributedCache cache)
        => _cache = cache ?? throw new ArgumentNullException(nameof(cache));

    public async IAsyncEnumerable<GroupId> GetAsync()
    {
        var json = await _cache.GetStringAsync(CacheKey);
        var projection = json is null
            ? new State(Array.Empty<GroupId>())
            : JsonSerializer.Deserialize<State>(json);

        foreach(var group in projection.Groups)
        {
            yield return group;
        }
    }

    public async Task SetAsync(IEnumerable<GroupId> groups)
    {
        var json = JsonSerializer.Serialize(new State(groups));
        await _cache.SetStringAsync(CacheKey, json);
    }
}

public class UpdateGroupsProjection : INotificationHandler<GroupEvent.Created>
{
    private readonly GroupsProjectionDistributedCache _cache;

    public UpdateGroupsProjection(GroupsProjectionDistributedCache cache)
        => _cache = cache ?? throw new ArgumentNullException(nameof(cache));

    public async Task Handle(GroupEvent.Created notification, CancellationToken cancellationToken)
    {
        var groups = await _cache.GetAsync().ToListAsync();
        groups.Add(notification.Id);
        await _cache.SetAsync(groups);
    }
}

# Dependency Injection

Use code like this to register the EventStore and MediatR.

In [None]:
var host = new HostBuilder()
    .ConfigureFunctionsWorkerDefaults()
    .ConfigureServices(services =>
    {
        services.AddMediatR(typeof(Group).Assembly);

        services.AddDistributedMemoryCache();

        services.AddEventinatR().UseCosmosEventStore();

        services.AddSingleton(serviceProvider =>
            {
                var configuration = serviceProvider.GetService<IConfiguration>();
                var connectionString = configuration.GetValue<string>("AzureWebJobsServiceBus");
                return new ServiceBusClient(connectionString).CreateSender("events");
            });

        services.AddSingleton<GroupsProjectionDistributedCache>();
        services.AddSingleton<ServiceBusEventPublisher>();
    })
    .Build();

# CosmosDBTrigger

Using the Function App CosmosDBTrigger binding, the changes to the event-store/events collection can be captured and forwarded to Azure Service Bus queue. We do not use the output binding because there is no way to specify the `SessionId` property so that streams are processed in order of the events by the consumer.

> Note: If the messages aren't sent to Azure Service Bus because of an error, the change feed events will not be replayed.

In [None]:
public class QueueChangeFeedEvents
{
    private readonly ServiceBusEventPublisher _publisher;

    public QueueChangeFeedEvents(ServiceBusEventPublisher publisher)
        => _publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));

    [Function(nameof(QueueChangeFeedEvents))]
    public Task RunAsync([CosmosDBTrigger(
        databaseName: CosmosEventStoreOptions.DefaultDatabaseId,
        collectionName: CosmosEventStoreOptions.DefaultContainerId,
        ConnectionStringSetting = "CosmosEventStore",
        LeaseCollectionName = "leases",
        CreateLeaseCollectionIfNotExists = true)] string json)
    {
        var events = CosmosEventStoreChangeFeed.ParseEvents(json);
        return _publisher.PublishAsync(events);
    }
}

# ServiceBusTrigger

Using the Function App ServiceBusTrigger binding, receive the event and process with MediatR.

In [None]:
public class ProcessEvent
{
    private readonly IMediator _mediator;

    public ProcessEvent(IMediator mediator)
        => _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));

    [Function(nameof(ProcessEvent))]
    public async Task RunAsync([ServiceBusTrigger("events", IsSessionsEnabled = true)] string json)
    {
        var e = BinaryData.FromString(json).ToObjectFromJson<EventinatR.Event>();

        if (e.TryConvert<Event>(out var result))
        {
            await _mediator.Publish(result);
        }
    }
}

# Groups API

Simple rest API for groups.

In [None]:
public class Api
{
    private readonly GroupRepository _repository;
    private readonly GroupsProjectionDistributedCache _cache;

    public Api(GroupRepository repository, GroupsProjectionDistributedCache cache)
        => (_repository, _cache) = (repository, cache);

    [Function(nameof(GetGroups))]
    public async Task<HttpResponseData> GetGroups([HttpTrigger(AuthorizationLevel.Function, "get", Route = "groups")] HttpRequestData req)
    {
        var groups = await _cache.GetAsync().Select(x => x.Name).ToArrayAsync();

        var res = req.CreateResponse(HttpStatusCode.OK);
        
        await res.WriteAsJsonAsync(groups);

        return res;
    }

    [Function(nameof(GetGroup))]
    public async Task<HttpResponseData> GetGroup(
        [HttpTrigger(AuthorizationLevel.Function, "get", Route = "groups/{id}")] HttpRequestData req,
        string id)
    {
        var group = await _repository.GetAsync(id);

        if (group is null)
        {
            return req.CreateResponse(HttpStatusCode.NotFound);
        }

        var res = req.CreateResponse(HttpStatusCode.OK);

        await res.WriteAsJsonAsync(group);

        return res;
    }

    [Function(nameof(CreateGroup))]
    public async Task<HttpResponseData> CreateGroup(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = "groups/{id}")] HttpRequestData req,
        string id)
    {
        var group = await _repository.GetAsync(id);

        if (group is not null)
        {
            return req.CreateResponse(HttpStatusCode.Conflict);
        }

        group = new Group(id);

        await _repository.SaveAsync(group);

        return req.CreateResponse(HttpStatusCode.Created);
    }

    [Function(nameof(GetGroupMembers))]
    public async Task<HttpResponseData> GetGroupMembers(
        [HttpTrigger(AuthorizationLevel.Function, "get", Route = "groups/{id}/members")] HttpRequestData req,
        string id)
    {
        var group = await _repository.GetAsync(id);

        if (group is null)
        {
            return req.CreateResponse(HttpStatusCode.NotFound);
        }

        var res = req.CreateResponse(HttpStatusCode.OK);

        await res.WriteAsJsonAsync(group.Members);

        return res;
    }

    [Function(nameof(AddGroupMember))]
    public async Task<HttpResponseData> AddGroupMember(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = "groups/{id}/members/{member}")] HttpRequestData req,
        string id,
        string member)
    {
        var group = await _repository.GetAsync(id);

        if (group is null)
        {
            return req.CreateResponse(HttpStatusCode.NotFound);
        }

        group.AddMember(member);

        await _repository.SaveAsync(group);

        return req.CreateResponse(HttpStatusCode.OK);
    }

    [Function(nameof(RemoveGroupMember))]
    public async Task<HttpResponseData> RemoveGroupMember(
        [HttpTrigger(AuthorizationLevel.Function, "delete", Route = "groups/{id}/members/{member}")] HttpRequestData req,
        string id,
        string member)
    {
        var group = await _repository.GetAsync(id);

        if (group is null)
        {
            return req.CreateResponse(HttpStatusCode.NotFound);
        }

        group.RemoveMember(member);

        await _repository.SaveAsync(group);

        return req.CreateResponse(HttpStatusCode.OK);
    }
}