# Outbox Impl with Immutable Architecture

## Aim

Demonstrate how Transactional Outbox funtionality can be achieved using append only database records.

## Process

- Outbox Items should be inserted inside a Sql transaction atomically in the app
- Outbox Items contain the Messaging entity, Type name and json data
- Separate background process polls for and dispatches messages
- Dispatch status is tracked using a separate table
- Status's: `Sent|TansientFailure|Scheduled|Cancelled`
- Retries with exponential backoff
- Separate background process can tidy up records later

## First to set some things up...

In [1]:
#r "nuget: Microsoft.DotNet.Interactive.SqlServer, *-*"

Loading extension script from `C:\Users\andre\.nuget\packages\microsoft.dotnet.interactive.sqlserver\1.0.0-beta.25177.1\interactive-extensions\dotnet\extension.dib`

In [2]:
#!connect mssql --kernel-name Outbox "Server=(localdb)\MSSQLLocalDB;Database=WeatherAppDb;"

Kernel added: #!sql-Outbox

## Now we can query the database:

In [3]:
SELECT * FROM OutboxItems
SELECT * FROM OutboxItemStatus

(0 rows affected)

(0 rows affected)

Info: No rows were returned for query 0 in batch 0.

Info: No rows were returned for query 1 in batch 0.

## Reset...

In [None]:
DELETE FROM [Outbox].[dbo].[OutboxItemStatus]
DELETE FROM [Outbox].[dbo].[OutboxItems]

## Insert an OutboxItem record via SQL...

In [4]:
INSERT INTO [Outbox].[dbo].[OutboxItems]
        ([TypeName]
        ,[SerialisedData]
        ,[MessagingEntityName]
        ,[Created])
    VALUES
        ('MyNamespace.MyEvent'
        ,'{\"MyProperty\":\"MyValue1\"}'
        ,'MyQueue'
        ,GETDATE())

SELECT * FROM OutboxItems

(1 row affected)

(0 rows affected)

Info: No rows were returned for query 0 in batch 0.

## Insert an OutboxSentStatus record via SQL...

In [5]:
declare @id int
set @id = (SELECT TOP(1) ID FROM [Outbox].[dbo].[OutboxItems] ORDER BY ID DESC) -- fetch the last OutboxItem ID

INSERT INTO [Outbox].[dbo].[OutboxItemStatus]
        ([OutboxItemId]
        ,[Status]
        ,[Created])
    VALUES
        (@id
        ,1 -- Sent
        ,GETDATE())

SELECT * FROM OutboxItems
SELECT * FROM OutboxItemStatus

(1 row affected)

(0 rows affected)

(0 rows affected)

Info: No rows were returned for query 0 in batch 0.

Info: No rows were returned for query 1 in batch 0.

## FK Constrint prevents deleting of any `OutboxItem` which are referenced in any `OutboxItemStatus`...

In [6]:
DELETE FROM [Outbox].[dbo].[OutboxItems]

Error: Msg 547, Level 16, State 0, Line 1
The DELETE statement conflicted with the REFERENCE constraint "FK__OutboxIte__Outbo__3B75D760". The conflict occurred in database "Outbox", table "dbo.OutboxItemStatus", column 'OutboxItemId'.

## Now we need some C# models and a repository...

In [7]:
#r "nuget:Dapper, 2.0.123"

using System.Data;
using System.Threading.Tasks;
using System.Text.Json;
using Dapper;

public const int NoIdYet = 0;

public record OutboxItem(
    long Id,
    string TypeName,
    string SerialisedData,
    string MessagingEntityName,
    DateTimeOffset Created
)
{
    public static OutboxItem Create<T>(T messageObject, string messagingEntityName)
    {
        return new OutboxItem(
            NoIdYet, 
            typeof(T).FullName, 
            JsonSerializer.Serialize(messageObject), 
            messagingEntityName, 
            TimeProvider.System.GetUtcNow());
    }
}

public enum OutboxSentStatus
{
    Pending = 0,
    Sent = 1,
    TransientFailure = 2,
    Scheduled = 3,
    Cancelled = 4
}

public record OutboxSentStatusUpdate(
    long Id,
    long OutboxItemId,
    OutboxSentStatus Status,
    DateTimeOffset? NotBefore,
    DateTimeOffset Created
)
{
    public static OutboxSentStatusUpdate CreateSent(long outboxItemId) => 
        new(NoIdYet, outboxItemId, OutboxSentStatus.Sent, null, TimeProvider.System.GetUtcNow());
    
    public static OutboxSentStatusUpdate CreateTransientFailure(long outboxItemId, DateTimeOffset notBefore) =>
        new(NoIdYet, outboxItemId, OutboxSentStatus.TransientFailure, notBefore, TimeProvider.System.GetUtcNow());
    
    public static OutboxSentStatusUpdate CreateScheduled(long outboxItemId, DateTimeOffset notBefore) =>
        new(NoIdYet, outboxItemId, OutboxSentStatus.Scheduled, notBefore, TimeProvider.System.GetUtcNow());
    
    public static OutboxSentStatusUpdate CreateCancelled(long outboxItemId) => 
        new(NoIdYet, outboxItemId, OutboxSentStatus.Cancelled, null, TimeProvider.System.GetUtcNow());
    
}

public class OutboxRepository
{
    private readonly IDbConnection _dbConnection;

    public OutboxRepository(IDbConnection dbConnection)
    {
        _dbConnection = dbConnection;
    }

    public async Task<long> Add(OutboxItem outboxItem, IDbTransaction? transaction = null)
    {        
        const string sql = @"
            INSERT INTO [dbo].[OutboxItems] ([TypeName], [SerialisedData], [MessagingEntityName], [Created])
            VALUES (@TypeName, @SerialisedData, @MessagingEntityName, @Created);
            SELECT CAST(SCOPE_IDENTITY() as BIGINT);";

        return await _dbConnection.ExecuteScalarAsync<long>(sql, outboxItem, transaction);
    }

    public async Task<long> AddScheduled(OutboxItem outboxItem, DateTimeOffset retryAfter)
    {
        _dbConnection.Open();
        using var transaction = _dbConnection.BeginTransaction();

        var id = await Add(outboxItem, transaction);
        await AddSentStatus(OutboxSentStatusUpdate.CreateScheduled(id, retryAfter), transaction);

        transaction.Commit();

        return id;
    }
    
    public async Task<long> AddSentStatus(OutboxSentStatusUpdate outboxSentStatusUpdate, IDbTransaction? transaction = null)
    {
        const string sql = @"
            INSERT INTO [dbo].[OutboxItemStatus] ([OutboxItemId], [Status], [NotBefore], [Created])
            VALUES (@OutboxItemId, @Status, @NotBefore, @Created);
            SELECT CAST(SCOPE_IDENTITY() as BIGINT);";

        return await _dbConnection.ExecuteScalarAsync<long>(sql, outboxSentStatusUpdate, transaction);
    }
}

### Add a dummy class to use as an Outbox Message...

And a method to generate random instances 🙂

In [8]:
public record PaymentRecievedEvent(string PaymentId, decimal Amount, string IsoCurrencyCode, string SortCode, string AccountNumber, string Reference)
{
    public static PaymentRecievedEvent Random()
    {
        return new PaymentRecievedEvent(
            Guid.NewGuid().ToString(),
            new Random().Next(1, 1000),
            "GBP",
            "12-34-56",
            "12345678",
            Guid.NewGuid().ToString()
        );
    }
}

### Now we can add some Outbox Items...

In [9]:
#r "nuget:Microsoft.Data.SqlClient, *-*"

using System;
using System.Data;
using Microsoft.Data.SqlClient;

var connectionString = "Server=(localdb)\\MSSQLLocalDB;Database=WeatherAppDb;";

using (var connection = new SqlConnection(connectionString))
{
    var outboxRepository = new OutboxRepository(connection);

    var outboxItemId1 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    await outboxRepository.AddSentStatus(OutboxSentStatusUpdate.CreateSent(outboxItemId1));
    Console.WriteLine($"{outboxItemId1} 1# Sent");

    var outboxItemId2 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    await outboxRepository.AddSentStatus(OutboxSentStatusUpdate.CreateTransientFailure(outboxItemId2, TimeProvider.System.GetUtcNow().AddMinutes(1)));
    Console.WriteLine($"{outboxItemId2} 2# Failed retry...");

    var outboxItemId3 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    await outboxRepository.AddSentStatus(OutboxSentStatusUpdate.CreateScheduled(outboxItemId3, TimeProvider.System.GetUtcNow().AddMinutes(1)));
    Console.WriteLine($"{outboxItemId3} 3# Scheduled wait...");

    var outboxItemId4 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    await outboxRepository.AddSentStatus(OutboxSentStatusUpdate.CreateCancelled(outboxItemId4));
    Console.WriteLine($"{outboxItemId4} 4# Cancelled");

    // var outboxItemId5 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    // var outboxItemId6 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    // var outboxItemId7 = await outboxRepository.Add(OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"));
    
    // var outboxItemId11 = await outboxRepository.AddScheduled(
    //     OutboxItem.Create(PaymentRecievedEvent.Random(), "PaymentRecievedEventTopic"), TimeProvider.System.GetUtcNow().AddMinutes(1));
};


1 1# Sent
2 2# Failed retry...
3 3# Scheduled wait...
4 4# Cancelled


### Querying for the next batch of `OutboxItems` to dispatch...

In [10]:
BEGIN TRAN; -- transaction allows locking, we are not updating anything.
    WITH LatestStatus AS
    (
        SELECT
            [OutboxItemId],
            [Status],
            [NotBefore],
            ROW_NUMBER() OVER (PARTITION BY [OutboxItemId] ORDER BY [Created] DESC) AS RowNum
        FROM [dbo].[OutboxItemStatus]
    )
    SELECT TOP (3) -- Batch size of 3 
        OI.*,
        CASE 
            WHEN LS.[Status] = 1 THEN 'Sent'
            WHEN LS.[Status] = 2 THEN 'TransientFailure'
            WHEN LS.[Status] = 3 THEN 'Scheduled'
            WHEN LS.[Status] = 4 THEN 'Cancelled'
            WHEN LS.[Status] IS NULL THEN 'Pending'
        END AS [Status],
        LS.[NotBefore]
    FROM [dbo].[OutboxItems] OI WITH (UPDLOCK, READPAST)
    LEFT JOIN LatestStatus LS
        ON OI.[Id] = LS.[OutboxItemId] AND LS.RowNum = 1
    WHERE
        LS.[OutboxItemId] IS NULL -- No status record exists yet, OutboxItem is pending
        OR (LS.[Status] = 2 AND LS.[NotBefore] < GETUTCDATE()) -- Transient Failure and NotBefore has past        
        OR (LS.[Status] = 3 AND LS.[NotBefore] < GETUTCDATE()) -- Scheduled and NotBefore has past        
    ORDER BY OI.[Created];  
ROLLBACK

(0 rows affected)

Info: No rows were returned for query 0 in batch 0.

In [11]:
SELECT * FROM OutboxItems

SELECT *,
    CASE 
        WHEN [Status] = 1 THEN 'Sent'
        WHEN [Status] = 2 THEN 'TransientFailure'
        WHEN [Status] = 3 THEN 'Scheduled'
        WHEN [Status] = 4 THEN 'Cancelled'
        ELSE 'Unknown Status'
    END AS StatusText
 FROM OutboxItemStatus

(4 rows affected)

(4 rows affected)

Id,TypeName,SerialisedData,MessagingEntityName,Created
1,Submission#3+PaymentRecievedEvent,"{""PaymentId"":""1446b106-06fe-4144-b0ad-7e547e6384a3"",""Amount"":939,""IsoCurrencyCode"":""GBP"",""SortCode"":""12-34-56"",""AccountNumber"":""12345678"",""Reference"":""32195cef-b2d6-4b8c-a95e-02768d813289""}",PaymentRecievedEventTopic,2025-04-02 15:47:20Z
2,Submission#3+PaymentRecievedEvent,"{""PaymentId"":""d89d2e4b-d9e7-4a87-93e9-b7ee695c5012"",""Amount"":958,""IsoCurrencyCode"":""GBP"",""SortCode"":""12-34-56"",""AccountNumber"":""12345678"",""Reference"":""8c89390f-270b-4248-b0ed-278531ccec84""}",PaymentRecievedEventTopic,2025-04-02 15:47:20Z
3,Submission#3+PaymentRecievedEvent,"{""PaymentId"":""68729cae-b2c3-4f83-bb43-cbbd01a1b2bb"",""Amount"":341,""IsoCurrencyCode"":""GBP"",""SortCode"":""12-34-56"",""AccountNumber"":""12345678"",""Reference"":""01304e96-b110-4e79-952a-a8d1977c5a36""}",PaymentRecievedEventTopic,2025-04-02 15:47:20Z
4,Submission#3+PaymentRecievedEvent,"{""PaymentId"":""b56a0056-071d-4be9-b2a4-2a515e8312a5"",""Amount"":620,""IsoCurrencyCode"":""GBP"",""SortCode"":""12-34-56"",""AccountNumber"":""12345678"",""Reference"":""8b19ec1a-24eb-4c2e-9256-3960829c3c15""}",PaymentRecievedEventTopic,2025-04-02 15:47:20Z


Id,OutboxItemId,Status,NotBefore,Created,StatusText
1,1,1,<null>,2025-04-02 15:47:20Z,Sent
2,2,2,2025-04-02 15:48:20Z,2025-04-02 15:47:20Z,TransientFailure
3,3,3,2025-04-02 15:48:20Z,2025-04-02 15:47:20Z,Scheduled
4,4,4,<null>,2025-04-02 15:47:20Z,Cancelled


### Now we can create an OutboxBatchRepository...

In [12]:
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using System.Text.Json.Serialization;

public record OutboxBatchItem
(
    long Id,
    string TypeName,
    string SerialisedData,
    string MessagingEntityName,
    DateTimeOffset Created,
    OutboxSentStatus Status,
    DateTimeOffset? NotBefore
)
{
    [JsonConstructor]
    public OutboxBatchItem(long id, string typeName, string serialisedData, string messagingEntityName, DateTimeOffset created, byte status, DateTimeOffset? notBefore)
        : this(id, typeName, serialisedData, messagingEntityName, created, (OutboxSentStatus)status, notBefore)
    {
    }
}

public class OutboxBatchRepository
{
    private readonly IDbConnection _dbConnection;

    public OutboxBatchRepository(IDbConnection dbConnection)
    {
        _dbConnection = dbConnection;
    }

    public async Task<IEnumerable<OutboxBatchItem>> GetNextBatchAsync(int batchSize)
    {
        const string sql = @"
            WITH LatestStatus AS
            (
                SELECT
                    OIS.[OutboxItemId],
                    OIS.[Status],
                    OIS.[NotBefore],
                    ROW_NUMBER() OVER (PARTITION BY OIS.[OutboxItemId] ORDER BY OIS.[Created] DESC) AS RowNum
                FROM [dbo].[OutboxItemStatus] OIS
            )
            SELECT TOP (@BatchSize) 
                OI.[Id],
                OI.[TypeName],
                OI.[SerialisedData],
                OI.[MessagingEntityName],
                OI.[Created],
                CAST(LS.[Status] AS INT) AS Status,                
                LS.[NotBefore]
            FROM [dbo].[OutboxItems] OI WITH (UPDLOCK, READPAST)
            LEFT JOIN LatestStatus LS
                ON OI.[Id] = LS.[OutboxItemId] AND LS.RowNum = 1
            WHERE
                LS.[OutboxItemId] IS NULL -- No status record exists yet
                OR (LS.[Status] = 2 AND LS.[NotBefore] < GETUTCDATE()) -- TransientFailure and NotBefore has passed        
                OR (LS.[Status] = 3 AND LS.[NotBefore] < GETUTCDATE()) -- Scheduled and NotBefore has passed        
            ORDER BY OI.[Created];";

        return await _dbConnection.QueryAsync<OutboxBatchItem>(sql, new { BatchSize = batchSize });        
    }
}

### And use it...

In [16]:
using System;
using Microsoft.Data.SqlClient;
using System.Threading.Tasks;

var connectionString = "Server=(localdb)\\MSSQLLocalDB;Database=WeatherAppDb;";

var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString);
var repository = new OutboxBatchRepository(connection);

var batchSize = 10;
var batch = await repository.GetNextBatchAsync(batchSize);

foreach (var item in batch)
	//Console.WriteLine($"Id: {item.Id} => {item.MessagingEntityName}, Status: {item.Status.ToString()}");
	$"- {item.Id} {item.MessagingEntityName} {item.Status.ToString()}".DisplayAs("text/markdown");

- 2 PaymentRecievedEventTopic TransientFailure

- 3 PaymentRecievedEventTopic Scheduled

ToDo:

- demonstrate batch locking with two consumers
- sweeper to delete old records