diff --git a/Directory.Packages.props b/Directory.Packages.props index 6da3902a..cbfda7da 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -34,10 +34,12 @@ + + diff --git a/Pulse.slnx b/Pulse.slnx index 75dd2e56..0e4a8eff 100644 --- a/Pulse.slnx +++ b/Pulse.slnx @@ -35,6 +35,7 @@ + @@ -60,6 +61,7 @@ + diff --git a/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqChannelAdapter.cs b/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqChannelAdapter.cs new file mode 100644 index 00000000..b5045b72 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqChannelAdapter.cs @@ -0,0 +1,35 @@ +namespace NetEvolve.Pulse.Internals; + +using RabbitMQ.Client; + +/// +/// Adapter interface for RabbitMQ channel operations. +/// +internal interface IRabbitMqChannelAdapter : IDisposable +{ + /// + /// Gets a value indicating whether the channel is open. + /// + bool IsOpen { get; } + + /// + /// Publishes a message asynchronously. + /// + /// The type of basic properties. + /// The exchange to publish to. + /// The routing key. + /// Whether the message is mandatory. + /// The message properties. + /// The message body. + /// A token to monitor for cancellation requests. + /// A task representing the asynchronous operation. + ValueTask BasicPublishAsync( + string exchange, + string routingKey, + bool mandatory, + TProperties basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default + ) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader; +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqConnectionAdapter.cs b/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqConnectionAdapter.cs new file mode 100644 index 00000000..68db2950 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Internals/IRabbitMqConnectionAdapter.cs @@ -0,0 +1,19 @@ +namespace NetEvolve.Pulse.Internals; + +/// +/// Adapter interface for RabbitMQ connection operations. +/// +internal interface IRabbitMqConnectionAdapter +{ + /// + /// Gets a value indicating whether the connection is open. + /// + bool IsOpen { get; } + + /// + /// Creates a new channel asynchronously. + /// + /// A token to monitor for cancellation requests. + /// A task representing the asynchronous operation, containing the created channel adapter. + Task CreateChannelAsync(CancellationToken cancellationToken = default); +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqChannelAdapter.cs b/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqChannelAdapter.cs new file mode 100644 index 00000000..ef7e9946 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqChannelAdapter.cs @@ -0,0 +1,39 @@ +namespace NetEvolve.Pulse.Internals; + +using RabbitMQ.Client; + +/// +/// Adapter implementation that wraps RabbitMQ.Client IChannel. +/// +internal sealed class RabbitMqChannelAdapter : IRabbitMqChannelAdapter +{ + private readonly IChannel _channel; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying RabbitMQ channel. + public RabbitMqChannelAdapter(IChannel channel) + { + ArgumentNullException.ThrowIfNull(channel); + _channel = channel; + } + + /// + public bool IsOpen => _channel.IsOpen; + + /// + public ValueTask BasicPublishAsync( + string exchange, + string routingKey, + bool mandatory, + TProperties basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default + ) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader => + _channel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken); + + /// + public void Dispose() => _channel.Dispose(); +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqConnectionAdapter.cs b/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqConnectionAdapter.cs new file mode 100644 index 00000000..7588016a --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Internals/RabbitMqConnectionAdapter.cs @@ -0,0 +1,31 @@ +namespace NetEvolve.Pulse.Internals; + +using RabbitMQ.Client; + +/// +/// Adapter implementation that wraps RabbitMQ.Client IConnection. +/// +internal sealed class RabbitMqConnectionAdapter : IRabbitMqConnectionAdapter +{ + private readonly IConnection _connection; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying RabbitMQ connection. + public RabbitMqConnectionAdapter(IConnection connection) + { + ArgumentNullException.ThrowIfNull(connection); + _connection = connection; + } + + /// + public bool IsOpen => _connection.IsOpen; + + /// + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + return new RabbitMqChannelAdapter(channel); + } +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj b/src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj new file mode 100644 index 00000000..20ddc494 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj @@ -0,0 +1,17 @@ + + + $(_ProjectTargetFrameworks) + RabbitMQ transport for the Pulse CQRS mediator outbox. Delivers outbox messages directly to RabbitMQ exchanges using the official .NET client, supporting single and batched sends with health checks for the broker connection. + $(PackageTags);rabbitmq;amqp; + NetEvolve.Pulse + + + + + + + + + + + diff --git a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs new file mode 100644 index 00000000..204644ab --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs @@ -0,0 +1,168 @@ +namespace NetEvolve.Pulse.Outbox; + +using System.Text; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Internals; +using RabbitMQ.Client; + +/// +/// Message transport that publishes outbox messages to RabbitMQ exchanges. +/// +/// +/// Connection Management: +/// This transport uses an injected connection adapter and creates channels on demand. +/// The connection lifetime is managed externally via dependency injection. +/// Routing Key Resolution: +/// Each message is published with a routing key resolved by . +/// By default, the simple class name of the event type is used (e.g., "OrderCreated"). +/// Payload: +/// The raw JSON payload from is published as the message body. +/// Health Checks: +/// The method verifies that the connection and channel are open. +/// +internal sealed class RabbitMqMessageTransport : IMessageTransport, IDisposable +{ + /// The resolved transport options controlling the RabbitMQ connection and exchange settings. + private readonly RabbitMqTransportOptions _options; + + /// The topic name resolver used to determine the routing key from an outbox message. + private readonly ITopicNameResolver _topicNameResolver; + + /// The RabbitMQ connection adapter. + private readonly IRabbitMqConnectionAdapter _connectionAdapter; + + /// Lazy-initialized RabbitMQ channel for publishing. + private IRabbitMqChannelAdapter? _channel; + + /// Semaphore for thread-safe channel initialization. + private readonly SemaphoreSlim _initializationLock = new(1, 1); + + /// Indicates whether the transport has been disposed. + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The RabbitMQ connection adapter. + /// The topic name resolver for determining routing keys from outbox messages. + /// The transport options. + internal RabbitMqMessageTransport( + IRabbitMqConnectionAdapter connectionAdapter, + ITopicNameResolver topicNameResolver, + IOptions options + ) + { + ArgumentNullException.ThrowIfNull(connectionAdapter); + ArgumentNullException.ThrowIfNull(topicNameResolver); + ArgumentNullException.ThrowIfNull(options); + + _connectionAdapter = connectionAdapter; + _topicNameResolver = topicNameResolver; + _options = options.Value; + } + + /// + public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(message); + + var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false); + var routingKey = ResolveRoutingKey(message); + var body = Encoding.UTF8.GetBytes(message.Payload); + + var properties = new BasicProperties + { + MessageId = message.Id.ToString(), + CorrelationId = message.CorrelationId, + ContentType = "application/json", + Timestamp = new AmqpTimestamp(message.CreatedAt.ToUnixTimeSeconds()), + Headers = new Dictionary + { + ["eventType"] = message.EventType, + ["retryCount"] = message.RetryCount, + }, + }; + + await channel + .BasicPublishAsync( + exchange: _options.ExchangeName, + routingKey: routingKey, + mandatory: false, + basicProperties: properties, + body: body, + cancellationToken: cancellationToken + ) + .ConfigureAwait(false); + } + + /// + public Task IsHealthyAsync(CancellationToken cancellationToken = default) + { + try + { + if (_connectionAdapter?.IsOpen != true || _channel?.IsOpen != true) + { + return Task.FromResult(false); + } + + // Perform a lightweight check by verifying channel is still open + // RabbitMQ client maintains the connection state internally + return Task.FromResult(_channel.IsOpen); + } + catch (Exception) when (!cancellationToken.IsCancellationRequested) + { + return Task.FromResult(false); + } + } + + /// + /// Ensures that a channel is available, creating it if necessary. + /// + /// A token to monitor for cancellation requests. + /// The initialized channel. + private async Task EnsureChannelAsync(CancellationToken cancellationToken) + { + if (_channel?.IsOpen == true) + { + return _channel; + } + + await _initializationLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (_channel?.IsOpen == true) + { + return _channel; + } + + _channel = await _connectionAdapter.CreateChannelAsync(cancellationToken).ConfigureAwait(false); + + return _channel; + } + finally + { + _ = _initializationLock.Release(); + } + } + + /// + /// Resolves the routing key for a given outbox message. + /// + /// The outbox message to resolve the routing key from. + /// The resolved routing key. + private string ResolveRoutingKey(OutboxMessage message) => _topicNameResolver.Resolve(message); + + /// + public void Dispose() + { + if (_disposed) + { + return; + } + + _channel?.Dispose(); + _initializationLock.Dispose(); + _disposed = true; + } +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqTransportOptions.cs b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqTransportOptions.cs new file mode 100644 index 00000000..7f5fd614 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqTransportOptions.cs @@ -0,0 +1,18 @@ +namespace NetEvolve.Pulse.Outbox; + +using NetEvolve.Pulse.Extensibility.Outbox; + +/// +/// Configuration options for . +/// +public sealed class RabbitMqTransportOptions +{ + /// + /// Gets or sets the target exchange name for publishing messages. + /// + /// + /// This is the RabbitMQ exchange where all outbox messages will be published. + /// The exchange must already exist; it will not be auto-declared. + /// + public string ExchangeName { get; set; } = string.Empty; +} diff --git a/src/NetEvolve.Pulse.RabbitMQ/README.md b/src/NetEvolve.Pulse.RabbitMQ/README.md new file mode 100644 index 00000000..5a7745b0 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/README.md @@ -0,0 +1,321 @@ +# NetEvolve.Pulse.RabbitMQ + +[![NuGet Version](https://img.shields.io/nuget/v/NetEvolve.Pulse.RabbitMQ.svg)](https://www.nuget.org/packages/NetEvolve.Pulse.RabbitMQ/) +[![NuGet Downloads](https://img.shields.io/nuget/dt/NetEvolve.Pulse.RabbitMQ.svg)](https://www.nuget.org/packages/NetEvolve.Pulse.RabbitMQ/) +[![License](https://img.shields.io/github/license/dailydevops/pulse.svg)](https://github.com/dailydevops/pulse/blob/main/LICENSE) + +RabbitMQ transport for the Pulse outbox pattern. Publishes outbox messages directly to RabbitMQ exchanges using the official .NET client, enabling reliable event delivery without routing through Dapr or other intermediaries. + +## Features + +- **Direct Publishing**: Send messages to RabbitMQ exchanges without additional infrastructure +- **Flexible Routing**: Automatic routing key resolution based on event types via `ITopicNameResolver` +- **Health Checks**: Verify connection and channel state for readiness probing +- **Batch Support**: Efficient batch publishing using parallel execution (default implementation) +- **Connection Management**: Singleton connection with lazy channel initialization + +## Installation + +### NuGet Package Manager + +```powershell +Install-Package NetEvolve.Pulse.RabbitMQ +``` + +### .NET CLI + +```bash +dotnet add package NetEvolve.Pulse.RabbitMQ +``` + +### PackageReference + +```xml + +``` + +## Quick Start + +### 1. Add the RabbitMQ client package + +```bash +dotnet add package RabbitMQ.Client +``` + +### 2. Register services + +```csharp +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Pulse; +using RabbitMQ.Client; + +var services = new ServiceCollection(); + +// Register RabbitMQ connection before UseRabbitMqTransport +services.AddSingleton(sp => +{ + var factory = new ConnectionFactory + { + HostName = "localhost", + Port = 5672, + VirtualHost = "/", + UserName = "guest", + Password = "guest" + }; + return factory.CreateConnectionAsync().GetAwaiter().GetResult(); +}); + +services.AddPulse(config => config + .AddOutbox( + options => options.Schema = "pulse", + processorOptions => processorOptions.BatchSize = 100) + .UseRabbitMqTransport(options => + { + options.ExchangeName = "events"; + })); +``` + +### 3. Store events via IEventOutbox + +Use `IEventOutbox` to store events reliably. The outbox processor picks them up and publishes each one to the configured RabbitMQ exchange: + +```csharp +public class OrderService +{ + private readonly IEventOutbox _outbox; + + public OrderService(IEventOutbox outbox) => _outbox = outbox; + + public async Task CreateOrderAsync(CreateOrderRequest request, CancellationToken ct) + { + // ... business logic ... + + // Stored reliably; published via RabbitMQ when the processor runs + await _outbox.StoreAsync(new OrderCreatedEvent + { + OrderId = Guid.NewGuid(), + CustomerId = request.CustomerId + }, ct); + } +} +``` + +## Transaction Integration + +For reliable at-least-once delivery guarantees, store outbox events within the same database transaction as your business data. Pair the RabbitMQ transport with a persistence provider that supports transaction enlistment: + +```csharp +public class OrderService +{ + private readonly ApplicationDbContext _context; + private readonly IEventOutbox _outbox; + + public OrderService(ApplicationDbContext context, IEventOutbox outbox) + { + _context = context; + _outbox = outbox; + } + + public async Task CreateOrderAsync(CreateOrderRequest request, CancellationToken ct) + { + // Begin transaction + await using var transaction = await _context.Database.BeginTransactionAsync(ct); + + try + { + // Business operation + var order = new Order { CustomerId = request.CustomerId, Total = request.Total }; + _context.Orders.Add(order); + await _context.SaveChangesAsync(ct); + + // Store event in outbox (same transaction) + await _outbox.StoreAsync(new OrderCreatedEvent + { + OrderId = order.Id, + CustomerId = order.CustomerId + }, ct); + + // Commit both business data and event atomically + await transaction.CommitAsync(ct); + } + catch + { + // Rollback discards both business data AND the outbox event + await transaction.RollbackAsync(ct); + throw; + } + } +} +``` + +> [!NOTE] +> The RabbitMQ transport only handles _publishing_. Transactional guarantees are provided by the persistence layer (e.g., `NetEvolve.Pulse.EntityFramework` or `NetEvolve.Pulse.SqlServer`). + +## Configuration + +### `RabbitMqTransportOptions` + +| Property | Type | Default | Description | +| -------------- | -------- | ------- | --------------------------------------------- | +| `ExchangeName` | `string` | `""` | Target exchange for publishing (**required**) | + +### Routing Key Resolution + +By default, the simple class name of the event type is used as the routing key. The assembly qualifier and namespace are stripped automatically via `ITopicNameResolver`. + +| `EventType` | Resolved routing key | +| -------------------------------------- | -------------------- | +| `MyApp.Events.OrderCreated, MyApp` | `OrderCreated` | +| `MyApp.Events.PaymentProcessed, MyApp` | `PaymentProcessed` | + +Override the resolver for custom naming strategies: + +```csharp +services.AddSingleton(); + +services.AddPulse(config => config + .UseRabbitMqTransport(options => + { + options.ExchangeName = "events"; + })); +``` + +## Exchange Setup + +> [!IMPORTANT] +> The target RabbitMQ exchange must already exist. This transport does not auto-declare exchanges or queues. + +### Example: Topic Exchange + +```bash +# Create a topic exchange for event routing +rabbitmqadmin declare exchange name=events type=topic durable=true + +# Create queues and bind them to specific event types +rabbitmqadmin declare queue name=order-service durable=true +rabbitmqadmin declare binding source=events destination=order-service routing_key="OrderCreated" + +rabbitmqadmin declare queue name=payment-service durable=true +rabbitmqadmin declare binding source=events destination=payment-service routing_key="PaymentProcessed" +``` + +### Example: Fanout Exchange + +```bash +# Create a fanout exchange for broadcasting to all subscribers +rabbitmqadmin declare exchange name=notifications type=fanout durable=true + +# Create queues and bind them (no routing key needed for fanout) +rabbitmqadmin declare queue name=email-service durable=true +rabbitmqadmin declare binding source=notifications destination=email-service + +rabbitmqadmin declare queue name=sms-service durable=true +rabbitmqadmin declare binding source=notifications destination=sms-service +``` + +## Consumer Integration + +Consume messages using the official RabbitMQ .NET client or any compatible library: + +```csharp +var factory = new ConnectionFactory { HostName = "localhost" }; +await using var connection = await factory.CreateConnectionAsync(); +await using var channel = await connection.CreateChannelAsync(); + +await channel.QueueDeclareAsync( + queue: "order-service", + durable: true, + exclusive: false, + autoDelete: false); + +await channel.QueueBindAsync( + queue: "order-service", + exchange: "events", + routingKey: "OrderCreated"); + +var consumer = new AsyncEventingBasicConsumer(channel); +consumer.ReceivedAsync += async (sender, ea) => +{ + var body = ea.Body.ToArray(); + var json = Encoding.UTF8.GetString(body); + var @event = JsonSerializer.Deserialize(json); + + // Handle the event + Console.WriteLine($"Order created: {@event.OrderId}"); + + await channel.BasicAckAsync(ea.DeliveryTag, multiple: false); +}; + +await channel.BasicConsumeAsync( + queue: "order-service", + autoAck: false, + consumer: consumer); +``` + +## How It Works + +1. Your application stores events in the outbox via `IEventOutbox.StoreAsync` within a database transaction. +2. The Pulse background processor polls the outbox for pending messages. +3. For each message, `RabbitMqMessageTransport` publishes it to the configured exchange with a routing key resolved by `ITopicNameResolver`. +4. RabbitMQ routes the message to bound queues based on the routing key and exchange type. +5. On success, the message is marked as processed; on failure, it remains pending for the next poll cycle. + +## Performance Considerations + +### Batch Processing + +Configure batch size and polling interval based on your throughput requirements: + +```csharp +.AddOutbox(processorOptions: options => +{ + options.BatchSize = 100; // Messages per poll cycle + options.PollingInterval = TimeSpan.FromSeconds(1); +}) +``` + +### Connection Management + +Register `IConnection` as a singleton in your DI container. The RabbitMQ client library is thread-safe and designed for concurrent use, so a single shared connection is recommended. + +### Channel Management + +Channels are created on demand and reused for subsequent sends. If a channel becomes closed, a new one is automatically created on the next send operation. + +## Requirements + +- .NET 8.0, .NET 9.0, or .NET 10.0 +- RabbitMQ 3.8+ (or compatible AMQP 0-9-1 broker) +- `RabbitMQ.Client` 7.0+ for async API support +- `Microsoft.Extensions.DependencyInjection` for service registration +- `Microsoft.Extensions.Hosting` for the background processor + +## Related Packages + +- [**NetEvolve.Pulse**](https://www.nuget.org/packages/NetEvolve.Pulse/) - Core mediator and outbox abstractions +- [**NetEvolve.Pulse.Extensibility**](https://www.nuget.org/packages/NetEvolve.Pulse.Extensibility/) - Core contracts and abstractions +- [**NetEvolve.Pulse.EntityFramework**](https://www.nuget.org/packages/NetEvolve.Pulse.EntityFramework/) - Entity Framework Core persistence provider +- [**NetEvolve.Pulse.SqlServer**](https://www.nuget.org/packages/NetEvolve.Pulse.SqlServer/) - SQL Server ADO.NET persistence provider +- [**NetEvolve.Pulse.Polly**](https://www.nuget.org/packages/NetEvolve.Pulse.Polly/) - Polly v8 resilience policies integration + +## Documentation + +For complete documentation, please visit the [official documentation](https://github.com/dailydevops/pulse/blob/main/README.md). + +## Contributing + +Contributions are welcome! Please read the [Contributing Guidelines](https://github.com/dailydevops/pulse/blob/main/CONTRIBUTING.md) before submitting a pull request. + +## Support + +- **Issues**: Report bugs or request features on [GitHub Issues](https://github.com/dailydevops/pulse/issues) +- **Documentation**: Read the full documentation at [https://github.com/dailydevops/pulse](https://github.com/dailydevops/pulse) + +## License + +This project is licensed under the MIT License - see the [LICENSE](https://github.com/dailydevops/pulse/blob/main/LICENSE) file for details. + +--- + +> [!NOTE] +> **Made with ❤️ by the NetEvolve Team** diff --git a/src/NetEvolve.Pulse.RabbitMQ/RabbitMqMediatorConfiguratorExtensions.cs b/src/NetEvolve.Pulse.RabbitMQ/RabbitMqMediatorConfiguratorExtensions.cs new file mode 100644 index 00000000..b1b63a36 --- /dev/null +++ b/src/NetEvolve.Pulse.RabbitMQ/RabbitMqMediatorConfiguratorExtensions.cs @@ -0,0 +1,63 @@ +namespace NetEvolve.Pulse; + +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Internals; +using NetEvolve.Pulse.Outbox; +using RabbitMQ.Client; + +/// +/// Extension methods for registering the RabbitMQ message transport with the Pulse mediator. +/// +public static class RabbitMqMediatorConfiguratorExtensions +{ + /// + /// Configures the outbox to publish messages via RabbitMQ. + /// + /// The mediator configurator. + /// Optional action to configure . + /// The configurator for chaining. + /// + /// Prerequisites: + /// IConnection must be registered in the DI container before calling this method. + /// The RabbitMQ exchange specified in must already exist. + /// This transport does not auto-declare exchanges or queues. + /// Note: + /// Replaces any previously registered . + /// + /// Thrown if is null. + public static IMediatorConfigurator UseRabbitMqTransport( + this IMediatorConfigurator configurator, + Action? configureOptions = null + ) + { + ArgumentNullException.ThrowIfNull(configurator); + + var services = configurator.Services; + + _ = services.AddOptions(); + if (configureOptions is not null) + { + _ = services.Configure(configureOptions); + } + + // Register the connection adapter + _ = services.AddSingleton(sp => + { + var connection = sp.GetRequiredService(); + return new RabbitMqConnectionAdapter(connection); + }); + + var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport)); + if (existing is not null) + { + _ = services.Remove(existing); + } + + _ = services.AddSingleton(); + + return configurator; + } +} diff --git a/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/NetEvolve.Pulse.RabbitMQ.Tests.Integration.csproj b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/NetEvolve.Pulse.RabbitMQ.Tests.Integration.csproj new file mode 100644 index 00000000..94f17ec0 --- /dev/null +++ b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/NetEvolve.Pulse.RabbitMQ.Tests.Integration.csproj @@ -0,0 +1,23 @@ + + + Exe + $(_TestTargetFrameworks) + + + + + + + + + + + + + + + + + + + diff --git a/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/RabbitMqTransportIntegrationTests.cs b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/RabbitMqTransportIntegrationTests.cs new file mode 100644 index 00000000..43899fb1 --- /dev/null +++ b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Integration/RabbitMqTransportIntegrationTests.cs @@ -0,0 +1,151 @@ +namespace NetEvolve.Pulse.RabbitMQ.Tests.Integration; + +using System.Text; +using global::RabbitMQ.Client; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Outbox; +using Testcontainers.RabbitMq; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +public sealed class RabbitMqTransportIntegrationTests : IAsyncDisposable +{ + private const string ExchangeName = "pulse-outbox-tests"; + private const string QueueName = "pulse-outbox-queue"; + private const string RoutingKey = "test.events"; + + private RabbitMqContainer? _container; + private IConnection? _connection; + + [Before(Test)] + public async Task StartContainerAsync() + { + _container = new RabbitMqBuilder().WithCleanUp(true).Build(); + + await _container.StartAsync(); + + var factory = new ConnectionFactory { Uri = new Uri(_container.GetConnectionString()) }; + _connection = await factory.CreateConnectionAsync(); + + // Declare exchange and queue for testing + var channel = await _connection.CreateChannelAsync(); + await channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Topic, durable: false); + await channel.QueueDeclareAsync(QueueName, durable: false, exclusive: false, autoDelete: false); + await channel.QueueBindAsync(QueueName, ExchangeName, RoutingKey); + await channel.CloseAsync(); + } + + [Test] + public async Task SendAsync_Publishes_message_to_rabbitmq() + { + using var transport = CreateTransport(); + var message = CreateOutboxMessage(); + + await transport.SendAsync(message); + + var receivedMessage = await ConsumeMessageAsync(); + + using (Assert.Multiple()) + { + _ = await Assert.That(receivedMessage).IsNotNull(); + _ = await Assert.That(receivedMessage!.Body).IsEqualTo(message.Payload); + _ = await Assert.That(receivedMessage.MessageId).IsEqualTo(message.Id.ToString()); + _ = await Assert.That(receivedMessage.CorrelationId).IsEqualTo(message.CorrelationId); + _ = await Assert.That(receivedMessage.EventType).IsEqualTo(message.EventType); + } + } + + [Test] + public async Task IsHealthyAsync_When_connected_returns_true() + { + using var transport = CreateTransport(); + + // Trigger connection by sending a message + await transport.SendAsync(CreateOutboxMessage()); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsTrue(); + } + + [Test] + public async Task IsHealthyAsync_When_disposed_returns_false() + { + var transport = CreateTransport(); + + // Trigger connection + await transport.SendAsync(CreateOutboxMessage()); + + transport.Dispose(); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsFalse(); + } + + public async ValueTask DisposeAsync() + { + if (_connection is not null) + { + await _connection.CloseAsync(); + _connection.Dispose(); + } + + if (_container is not null) + { + await _container.DisposeAsync(); + } + } + + private RabbitMqMessageTransport CreateTransport() + { + var options = new RabbitMqTransportOptions { ExchangeName = ExchangeName }; + + var adapter = new NetEvolve.Pulse.Internals.RabbitMqConnectionAdapter(_connection!); + return new RabbitMqMessageTransport(adapter, new FakeTopicNameResolver(), Options.Create(options)); + } + + private static OutboxMessage CreateOutboxMessage() => + new() + { + Id = Guid.NewGuid(), + EventType = "Integration.Event", + Payload = """{"event":"integration"}""", + CorrelationId = "corr-123", + CreatedAt = DateTimeOffset.UtcNow, + UpdatedAt = DateTimeOffset.UtcNow, + RetryCount = 1, + }; + + private async Task ConsumeMessageAsync() + { + var channel = await _connection!.CreateChannelAsync(); + var result = await channel.BasicGetAsync(QueueName, autoAck: true); + + if (result is null) + { + await channel.CloseAsync(); + return null; + } + + var body = Encoding.UTF8.GetString(result.Body.ToArray()); + var messageId = result.BasicProperties.MessageId; + var correlationId = result.BasicProperties.CorrelationId; + var eventType = + result.BasicProperties.Headers?.TryGetValue("eventType", out var et) == true + ? Encoding.UTF8.GetString((byte[])et) + : null; + + await channel.CloseAsync(); + + return new ReceivedMessage(body, messageId, correlationId, eventType); + } + + private sealed record ReceivedMessage(string Body, string? MessageId, string? CorrelationId, string? EventType); + + private sealed class FakeTopicNameResolver : ITopicNameResolver + { + public string Resolve(OutboxMessage message) => RoutingKey; + } +} diff --git a/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/NetEvolve.Pulse.RabbitMQ.Tests.Unit.csproj b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/NetEvolve.Pulse.RabbitMQ.Tests.Unit.csproj new file mode 100644 index 00000000..0c0249c0 --- /dev/null +++ b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/NetEvolve.Pulse.RabbitMQ.Tests.Unit.csproj @@ -0,0 +1,21 @@ + + + Exe + $(_TestTargetFrameworks) + + + + + + + + + + + + + + + + + diff --git a/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMediatorConfiguratorExtensionsTests.cs b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMediatorConfiguratorExtensionsTests.cs new file mode 100644 index 00000000..906d8814 --- /dev/null +++ b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMediatorConfiguratorExtensionsTests.cs @@ -0,0 +1,107 @@ +namespace NetEvolve.Pulse.RabbitMQ.Tests.Unit; + +using global::RabbitMQ.Client; +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +public sealed class RabbitMqMediatorConfiguratorExtensionsTests +{ + [Test] + public async Task UseRabbitMqTransport_Registers_transport_service() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseRabbitMqTransport()); + + var descriptor = services.Single(d => d.ServiceType == typeof(IMessageTransport)); + _ = await Assert.That(descriptor.ImplementationType).IsEqualTo(typeof(RabbitMqMessageTransport)); + _ = await Assert.That(descriptor.Lifetime).IsEqualTo(ServiceLifetime.Singleton); + } + + [Test] + public async Task UseRabbitMqTransport_Configures_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => + config.UseRabbitMqTransport(options => + { + options.ExchangeName = "test-exchange"; + }) + ); + + await using var provider = services.BuildServiceProvider(); + var options = provider.GetRequiredService>(); + + _ = await Assert.That(options.Value.ExchangeName).IsEqualTo("test-exchange"); + } + + [Test] + public async Task UseRabbitMqTransport_Without_configureOptions_registers_default_options() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddPulse(config => config.UseRabbitMqTransport()); + + await using var provider = services.BuildServiceProvider(); + var options = provider.GetRequiredService>(); + + // Verify default options are accessible + _ = await Assert.That(options.Value).IsNotNull(); + _ = await Assert.That(options.Value.ExchangeName).IsEqualTo(string.Empty); + } + + [Test] + public async Task UseRabbitMqTransport_Replaces_existing_transport() + { + IServiceCollection services = new ServiceCollection(); + _ = services.AddSingleton(new DummyTransport()); + _ = services.AddPulse(config => config.UseRabbitMqTransport()); + + var descriptors = services.Where(d => d.ServiceType == typeof(IMessageTransport)).ToList(); + + using (Assert.Multiple()) + { + _ = await Assert.That(descriptors.Count).IsEqualTo(1); + _ = await Assert.That(descriptors[0].ImplementationType).IsEqualTo(typeof(RabbitMqMessageTransport)); + } + } + + [Test] + public async Task UseRabbitMqTransport_When_configurator_null_throws() + { + IMediatorConfigurator configurator = null!; + + var exception = Assert.Throws(() => configurator.UseRabbitMqTransport()); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("configurator"); + } + + [Test] + public async Task UseRabbitMqTransport_Returns_configurator_for_chaining() + { + IServiceCollection services = new ServiceCollection(); + IMediatorConfigurator? returnedConfigurator = null; + + _ = services.AddPulse(config => returnedConfigurator = config.UseRabbitMqTransport()); + + _ = await Assert.That(returnedConfigurator).IsNotNull(); + } + +#pragma warning disable CA1812 // Avoid uninstantiated internal classes - instantiated via DI container + private sealed class DummyTransport : IMessageTransport +#pragma warning restore CA1812 + { + public Task IsHealthyAsync(CancellationToken cancellationToken = default) => Task.FromResult(true); + + public Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default) => + Task.CompletedTask; + + public Task SendBatchAsync( + IEnumerable messages, + CancellationToken cancellationToken = default + ) => Task.CompletedTask; + } +} diff --git a/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMessageTransportTests.cs b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMessageTransportTests.cs new file mode 100644 index 00000000..569b9986 --- /dev/null +++ b/tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/RabbitMqMessageTransportTests.cs @@ -0,0 +1,417 @@ +namespace NetEvolve.Pulse.RabbitMQ.Tests.Unit; + +using System.Text; +using global::RabbitMQ.Client; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility.Outbox; +using NetEvolve.Pulse.Internals; +using NetEvolve.Pulse.Outbox; +using TUnit.Assertions.Extensions; +using TUnit.Core; + +public sealed class RabbitMqMessageTransportTests +{ + [Test] + public async Task Constructor_When_connectionAdapter_null_throws() + { + IRabbitMqConnectionAdapter connectionAdapter = null!; + var topicNameResolver = new FakeTopicNameResolver(); + var options = CreateOptions(); + + var exception = Assert.Throws(() => + { + _ = new RabbitMqMessageTransport(connectionAdapter, topicNameResolver, options); + }); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("connectionAdapter"); + } + + [Test] + public async Task Constructor_When_topicNameResolver_null_throws() + { + var connectionAdapter = new FakeConnectionAdapter(); + ITopicNameResolver topicNameResolver = null!; + var options = CreateOptions(); + + var exception = Assert.Throws(() => + { + _ = new RabbitMqMessageTransport(connectionAdapter, topicNameResolver, options); + }); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("topicNameResolver"); + } + + [Test] + public async Task Constructor_When_options_null_throws() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + IOptions options = null!; + + var exception = Assert.Throws(() => + { + _ = new RabbitMqMessageTransport(connectionAdapter, topicNameResolver, options); + }); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("options"); + } + + [Test] + public async Task SendAsync_When_message_null_throws() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + var exception = await Assert.ThrowsAsync(() => transport.SendAsync(null!)); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.ParamName).IsEqualTo("message"); + } + + [Test] + public async Task SendAsync_Publishes_message_with_correct_properties() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver, exchangeName: "test-exchange"); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(1); + + var publishCall = channel.PublishCalls.Single(); + using (Assert.Multiple()) + { + _ = await Assert.That(publishCall.Exchange).IsEqualTo("test-exchange"); + _ = await Assert.That(publishCall.RoutingKey).IsEqualTo("Sample.Event.Created"); + _ = await Assert.That(publishCall.Mandatory).IsFalse(); + + var props = publishCall.Properties; + _ = await Assert.That(props.MessageId).IsEqualTo(outboxMessage.Id.ToString()); + _ = await Assert.That(props.CorrelationId).IsEqualTo(outboxMessage.CorrelationId); + _ = await Assert.That(props.ContentType).IsEqualTo("application/json"); + _ = await Assert.That(props.Headers!["eventType"]).IsEqualTo(outboxMessage.EventType); + _ = await Assert.That(props.Headers!["retryCount"]).IsEqualTo(outboxMessage.RetryCount); + + var bodyText = Encoding.UTF8.GetString(publishCall.Body.ToArray()); + _ = await Assert.That(bodyText).IsEqualTo(outboxMessage.Payload); + } + } + + [Test] + public async Task SendAsync_Reuses_open_channel() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + var message1 = CreateOutboxMessage(); + var message2 = CreateOutboxMessage(); + + await transport.SendAsync(message1); + await transport.SendAsync(message2); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(1); + var channel = connectionAdapter.CreatedChannels.Single(); + _ = await Assert.That(channel.PublishCallCount).IsEqualTo(2); + } + + [Test] + public async Task SendAsync_Creates_new_channel_when_previous_closed() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + var message1 = CreateOutboxMessage(); + var message2 = CreateOutboxMessage(); + + await transport.SendAsync(message1); + var firstChannel = connectionAdapter.CreatedChannels.Single(); + firstChannel.IsOpen = false; // Simulate channel closure + + await transport.SendAsync(message2); + + _ = await Assert.That(connectionAdapter.CreateChannelCallCount).IsEqualTo(2); + _ = await Assert.That(connectionAdapter.CreatedChannels.Count).IsEqualTo(2); + } + + [Test] + public async Task SendAsync_Uses_topic_name_resolver_for_routing_key() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver { ResolvedName = "custom-routing-key" }; + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + var outboxMessage = CreateOutboxMessage(); + + await transport.SendAsync(outboxMessage); + + var channel = connectionAdapter.CreatedChannels.Single(); + var publishCall = channel.PublishCalls.Single(); + _ = await Assert.That(publishCall.RoutingKey).IsEqualTo("custom-routing-key"); + _ = await Assert.That(topicNameResolver.ResolveCallCount).IsEqualTo(1); + } + + [Test] + public async Task IsHealthyAsync_When_connection_not_open_returns_false() + { + var connectionAdapter = new FakeConnectionAdapter { IsOpen = false }; + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsFalse(); + } + + [Test] + public async Task IsHealthyAsync_When_channel_not_created_returns_false() + { + var connectionAdapter = new FakeConnectionAdapter { IsOpen = true }; + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsFalse(); + } + + [Test] + public async Task IsHealthyAsync_When_channel_not_open_returns_false() + { + var connectionAdapter = new FakeConnectionAdapter { IsOpen = true }; + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + // Create a channel + await transport.SendAsync(CreateOutboxMessage()); + var channel = connectionAdapter.CreatedChannels.Single(); + channel.IsOpen = false; + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsFalse(); + } + + [Test] + public async Task IsHealthyAsync_When_connection_and_channel_open_returns_true() + { + var connectionAdapter = new FakeConnectionAdapter { IsOpen = true }; + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + // Create a channel + await transport.SendAsync(CreateOutboxMessage()); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsTrue(); + } + + [Test] + public async Task IsHealthyAsync_When_exception_thrown_returns_false() + { + var connectionAdapter = new FakeConnectionAdapter { IsOpen = true, ThrowOnIsOpen = true }; + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + var healthy = await transport.IsHealthyAsync(); + + _ = await Assert.That(healthy).IsFalse(); + } + + [Test] + public async Task Dispose_Disposes_channel_and_lock() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + using var transport = CreateTransport(connectionAdapter, topicNameResolver); + + await transport.SendAsync(CreateOutboxMessage()); + var channel = connectionAdapter.CreatedChannels.Single(); + + transport.Dispose(); + + _ = await Assert.That(channel.DisposeCalled).IsTrue(); + } + + [Test] + public async Task Dispose_Is_idempotent() + { + var connectionAdapter = new FakeConnectionAdapter(); + var topicNameResolver = new FakeTopicNameResolver(); + var transport = CreateTransport(connectionAdapter, topicNameResolver); + + await transport.SendAsync(CreateOutboxMessage()); + var channel = connectionAdapter.CreatedChannels.Single(); + + transport.Dispose(); + transport.Dispose(); + + _ = await Assert.That(channel.DisposeCallCount).IsEqualTo(1); + } + + [Test] + public async Task Options_ExchangeName_can_be_configured() + { + var options = new RabbitMqTransportOptions { ExchangeName = "test-exchange" }; + + _ = await Assert.That(options.ExchangeName).IsEqualTo("test-exchange"); + } + + [Test] + public async Task Options_Default_ExchangeName_is_empty_string() + { + var options = new RabbitMqTransportOptions(); + + _ = await Assert.That(options.ExchangeName).IsEqualTo(string.Empty); + } + + private static RabbitMqMessageTransport CreateTransport( + IRabbitMqConnectionAdapter connectionAdapter, + ITopicNameResolver topicNameResolver, + string exchangeName = "events" + ) + { + var options = CreateOptions(exchangeName); + return new RabbitMqMessageTransport(connectionAdapter, topicNameResolver, options); + } + + private static IOptions CreateOptions(string exchangeName = "events") => + Options.Create(new RabbitMqTransportOptions { ExchangeName = exchangeName }); + + private static OutboxMessage CreateOutboxMessage() => + new() + { + Id = Guid.NewGuid(), + EventType = "Sample.Event.Created", + Payload = """{"event":"sample"}""", + CorrelationId = "corr-123", + CreatedAt = DateTimeOffset.UtcNow, + UpdatedAt = DateTimeOffset.UtcNow, + RetryCount = 1, + ProcessedAt = DateTimeOffset.UtcNow, + }; + + private sealed class FakeTopicNameResolver : ITopicNameResolver + { + public string ResolvedName { get; set; } = "Sample.Event.Created"; + + public int ResolveCallCount { get; private set; } + + public string Resolve(OutboxMessage message) + { + ResolveCallCount++; + return ResolvedName; + } + } + + private sealed class FakeConnectionAdapter : IRabbitMqConnectionAdapter + { + private bool _isOpen = true; + + public bool IsOpen + { + get + { + if (ThrowOnIsOpen) + { + throw new InvalidOperationException("Connection check failed"); + } + + return _isOpen; + } + set => _isOpen = value; + } + + public bool ThrowOnIsOpen { get; set; } + + public int CreateChannelCallCount { get; private set; } + + public List CreatedChannels { get; } = []; + + public Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + CreateChannelCallCount++; + var channel = new FakeChannelAdapter(); + CreatedChannels.Add(channel); + return Task.FromResult(channel); + } + } + + private sealed class FakeChannelAdapter : IRabbitMqChannelAdapter + { + public bool IsOpen { get; set; } = true; + + public int PublishCallCount { get; private set; } + + public List PublishCalls { get; } = []; + + public bool DisposeCalled { get; private set; } + + public int DisposeCallCount { get; private set; } + + public ValueTask BasicPublishAsync( + string exchange, + string routingKey, + bool mandatory, + TProperties basicProperties, + ReadOnlyMemory body, + CancellationToken cancellationToken = default + ) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + PublishCallCount++; + PublishCalls.Add( + new PublishCall + { + Exchange = exchange, + RoutingKey = routingKey, + Mandatory = mandatory, + Properties = ExtractProperties(basicProperties), + Body = body, + } + ); + return ValueTask.CompletedTask; + } + + private static BasicProperties ExtractProperties(TProperties props) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + var result = new BasicProperties + { + MessageId = props.MessageId, + CorrelationId = props.CorrelationId, + ContentType = props.ContentType, + Timestamp = props.Timestamp, + }; + + if (props.Headers is not null) + { + result.Headers = new Dictionary(props.Headers); + } + + return result; + } + + public void Dispose() + { + DisposeCalled = true; + DisposeCallCount++; + } + } + + private sealed record PublishCall + { + public required string Exchange { get; init; } + public required string RoutingKey { get; init; } + public required bool Mandatory { get; init; } + public required BasicProperties Properties { get; init; } + public required ReadOnlyMemory Body { get; init; } + } +}