Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
<PackageVersion Include="NetEvolve.Extensions.TUnit" Version="3.5.238" />
<PackageVersion Include="Npgsql" Version="10.0.2" />
<PackageVersion Include="Polly.Core" Version="8.6.6" />
<PackageVersion Include="RabbitMQ.Client" Version="7.1.0" />
<PackageVersion Include="Testcontainers" Version="4.11.0" />
<PackageVersion Include="Testcontainers.Kafka" Version="4.11.0" />
<PackageVersion Include="Testcontainers.MsSql" Version="4.11.0" />
<PackageVersion Include="Testcontainers.PostgreSql" Version="4.11.0" />
<PackageVersion Include="Testcontainers.RabbitMq" Version="4.11.0" />
<PackageVersion Include="TUnit" Version="1.23.7" />
<PackageVersion Include="Verify.TUnit" Version="31.13.5" />
</ItemGroup>
Expand Down
2 changes: 2 additions & 0 deletions Pulse.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<Project Path="src/NetEvolve.Pulse.FluentValidation/NetEvolve.Pulse.FluentValidation.csproj" />
<Project Path="src/NetEvolve.Pulse.Testing/NetEvolve.Pulse.Testing.csproj" />
<Project Path="src/NetEvolve.Pulse.Attributes/NetEvolve.Pulse.Attributes.csproj" />
<Project Path="src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj" />
<Project Path="src/NetEvolve.Pulse.SourceGeneration/NetEvolve.Pulse.SourceGeneration.csproj" />
<Project Path="src/NetEvolve.Pulse.AzureServiceBus/NetEvolve.Pulse.AzureServiceBus.csproj" />
<Project Path="src/NetEvolve.Pulse.SQLite/NetEvolve.Pulse.SQLite.csproj" />
Expand All @@ -60,6 +61,7 @@
<Project Path="tests/NetEvolve.Pulse.Testing.Tests.Unit/NetEvolve.Pulse.Testing.Tests.Unit.csproj" />
<Project Path="tests/NetEvolve.Pulse.AzureServiceBus.Tests.Unit/NetEvolve.Pulse.AzureServiceBus.Tests.Unit.csproj" />
<Project Path="tests/NetEvolve.Pulse.AzureServiceBus.Tests.Integration/NetEvolve.Pulse.AzureServiceBus.Tests.Integration.csproj" />
<Project Path="tests/NetEvolve.Pulse.RabbitMQ.Tests.Unit/NetEvolve.Pulse.RabbitMQ.Tests.Unit.csproj" />
<Project Path="tests/NetEvolve.Pulse.Kafka.Tests.Unit/NetEvolve.Pulse.Kafka.Tests.Unit.csproj" />
<Project Path="tests/NetEvolve.Pulse.Kafka.Tests.Integration/NetEvolve.Pulse.Kafka.Tests.Integration.csproj" />
<Project Path="tests/NetEvolve.Pulse.SourceGeneration.Tests.Unit/NetEvolve.Pulse.SourceGeneration.Tests.Unit.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace NetEvolve.Pulse.Internals;

using RabbitMQ.Client;

/// <summary>
/// Adapter interface for RabbitMQ channel operations.
/// </summary>
internal interface IRabbitMqChannelAdapter : IDisposable
{
/// <summary>
/// Gets a value indicating whether the channel is open.
/// </summary>
bool IsOpen { get; }

/// <summary>
/// Publishes a message asynchronously.
/// </summary>
/// <typeparam name="TProperties">The type of basic properties.</typeparam>
/// <param name="exchange">The exchange to publish to.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="mandatory">Whether the message is mandatory.</param>
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation.</returns>
ValueTask BasicPublishAsync<TProperties>(
string exchange,
string routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default
)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace NetEvolve.Pulse.Internals;

/// <summary>
/// Adapter interface for RabbitMQ connection operations.
/// </summary>
internal interface IRabbitMqConnectionAdapter
{
/// <summary>
/// Gets a value indicating whether the connection is open.
/// </summary>
bool IsOpen { get; }

/// <summary>
/// Creates a new channel asynchronously.
/// </summary>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation, containing the created channel adapter.</returns>
Task<IRabbitMqChannelAdapter> CreateChannelAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace NetEvolve.Pulse.Internals;

using RabbitMQ.Client;

/// <summary>
/// Adapter implementation that wraps RabbitMQ.Client IChannel.
/// </summary>
internal sealed class RabbitMqChannelAdapter : IRabbitMqChannelAdapter
{
private readonly IChannel _channel;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqChannelAdapter"/> class.
/// </summary>
/// <param name="channel">The underlying RabbitMQ channel.</param>
public RabbitMqChannelAdapter(IChannel channel)
{
ArgumentNullException.ThrowIfNull(channel);
_channel = channel;
}

/// <inheritdoc />
public bool IsOpen => _channel.IsOpen;

/// <inheritdoc />
public ValueTask BasicPublishAsync<TProperties>(
string exchange,
string routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default
)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader =>
_channel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);

/// <inheritdoc />
public void Dispose() => _channel.Dispose();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace NetEvolve.Pulse.Internals;

using RabbitMQ.Client;

/// <summary>
/// Adapter implementation that wraps RabbitMQ.Client IConnection.
/// </summary>
internal sealed class RabbitMqConnectionAdapter : IRabbitMqConnectionAdapter
{
private readonly IConnection _connection;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqConnectionAdapter"/> class.
/// </summary>
/// <param name="connection">The underlying RabbitMQ connection.</param>
public RabbitMqConnectionAdapter(IConnection connection)
{
ArgumentNullException.ThrowIfNull(connection);
_connection = connection;
}

/// <inheritdoc />
public bool IsOpen => _connection.IsOpen;

/// <inheritdoc />
public async Task<IRabbitMqChannelAdapter> CreateChannelAsync(CancellationToken cancellationToken = default)
{
var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
return new RabbitMqChannelAdapter(channel);
}
}
17 changes: 17 additions & 0 deletions src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(_ProjectTargetFrameworks)</TargetFrameworks>
<Description>RabbitMQ transport for the Pulse CQRS mediator outbox. Delivers outbox messages directly to RabbitMQ exchanges using the official .NET client, supporting single and batched sends with health checks for the broker connection.</Description>
<PackageTags>$(PackageTags);rabbitmq;amqp;</PackageTags>
<RootNamespace>NetEvolve.Pulse</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="RabbitMQ.Client" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\NetEvolve.Pulse.Extensibility\NetEvolve.Pulse.Extensibility.csproj" />
</ItemGroup>
</Project>
168 changes: 168 additions & 0 deletions src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqMessageTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace NetEvolve.Pulse.Outbox;

using System.Text;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Internals;
using RabbitMQ.Client;

/// <summary>
/// Message transport that publishes outbox messages to RabbitMQ exchanges.
/// </summary>
/// <remarks>
/// <para><strong>Connection Management:</strong></para>
/// This transport uses an injected connection adapter and creates channels on demand.
/// The connection lifetime is managed externally via dependency injection.
/// <para><strong>Routing Key Resolution:</strong></para>
/// Each message is published with a routing key resolved by <see cref="ITopicNameResolver"/>.
/// By default, the simple class name of the event type is used (e.g., <c>"OrderCreated"</c>).
/// <para><strong>Payload:</strong></para>
/// The raw JSON payload from <see cref="OutboxMessage.Payload"/> is published as the message body.
/// <para><strong>Health Checks:</strong></para>
/// The <see cref="IsHealthyAsync"/> method verifies that the connection and channel are open.
/// </remarks>
internal sealed class RabbitMqMessageTransport : IMessageTransport, IDisposable
{
/// <summary>The resolved transport options controlling the RabbitMQ connection and exchange settings.</summary>
private readonly RabbitMqTransportOptions _options;

/// <summary>The topic name resolver used to determine the routing key from an outbox message.</summary>
private readonly ITopicNameResolver _topicNameResolver;

/// <summary>The RabbitMQ connection adapter.</summary>
private readonly IRabbitMqConnectionAdapter _connectionAdapter;

/// <summary>Lazy-initialized RabbitMQ channel for publishing.</summary>
private IRabbitMqChannelAdapter? _channel;

/// <summary>Semaphore for thread-safe channel initialization.</summary>
private readonly SemaphoreSlim _initializationLock = new(1, 1);

/// <summary>Indicates whether the transport has been disposed.</summary>
private bool _disposed;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqMessageTransport"/> class.
/// </summary>
/// <param name="connectionAdapter">The RabbitMQ connection adapter.</param>
/// <param name="topicNameResolver">The topic name resolver for determining routing keys from outbox messages.</param>
/// <param name="options">The transport options.</param>
internal RabbitMqMessageTransport(
IRabbitMqConnectionAdapter connectionAdapter,
ITopicNameResolver topicNameResolver,
IOptions<RabbitMqTransportOptions> options
)
{
ArgumentNullException.ThrowIfNull(connectionAdapter);
ArgumentNullException.ThrowIfNull(topicNameResolver);
ArgumentNullException.ThrowIfNull(options);

_connectionAdapter = connectionAdapter;
_topicNameResolver = topicNameResolver;
_options = options.Value;
}

/// <inheritdoc />
public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);

var channel = await EnsureChannelAsync(cancellationToken).ConfigureAwait(false);
var routingKey = ResolveRoutingKey(message);
Comment thread
samtrion marked this conversation as resolved.
var body = Encoding.UTF8.GetBytes(message.Payload);

var properties = new BasicProperties
{
MessageId = message.Id.ToString(),
CorrelationId = message.CorrelationId,
ContentType = "application/json",
Timestamp = new AmqpTimestamp(message.CreatedAt.ToUnixTimeSeconds()),
Headers = new Dictionary<string, object?>
{
["eventType"] = message.EventType,
["retryCount"] = message.RetryCount,
},
};

await channel
.BasicPublishAsync(
exchange: _options.ExchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);
}

/// <inheritdoc />
public Task<bool> IsHealthyAsync(CancellationToken cancellationToken = default)
{
try
{
if (_connectionAdapter?.IsOpen != true || _channel?.IsOpen != true)
{
return Task.FromResult(false);
}

// Perform a lightweight check by verifying channel is still open
// RabbitMQ client maintains the connection state internally
return Task.FromResult(_channel.IsOpen);
}
catch (Exception) when (!cancellationToken.IsCancellationRequested)
{
return Task.FromResult(false);
}
}

/// <summary>
/// Ensures that a channel is available, creating it if necessary.
/// </summary>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>The initialized channel.</returns>
private async Task<IRabbitMqChannelAdapter> EnsureChannelAsync(CancellationToken cancellationToken)
{
if (_channel?.IsOpen == true)
{
return _channel;
}

await _initializationLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_channel?.IsOpen == true)
{
return _channel;
}

_channel = await _connectionAdapter.CreateChannelAsync(cancellationToken).ConfigureAwait(false);

Comment thread
samtrion marked this conversation as resolved.
return _channel;
}
finally
{
_ = _initializationLock.Release();
}
}

/// <summary>
/// Resolves the routing key for a given outbox message.
/// </summary>
/// <param name="message">The outbox message to resolve the routing key from.</param>
/// <returns>The resolved routing key.</returns>
private string ResolveRoutingKey(OutboxMessage message) => _topicNameResolver.Resolve(message);

/// <inheritdoc />
public void Dispose()
{
if (_disposed)
{
return;
}

_channel?.Dispose();
_initializationLock.Dispose();
_disposed = true;
}
}
18 changes: 18 additions & 0 deletions src/NetEvolve.Pulse.RabbitMQ/Outbox/RabbitMqTransportOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NetEvolve.Pulse.Outbox;

using NetEvolve.Pulse.Extensibility.Outbox;

/// <summary>
/// Configuration options for <see cref="RabbitMqMessageTransport"/>.
/// </summary>
public sealed class RabbitMqTransportOptions
{
/// <summary>
/// Gets or sets the target exchange name for publishing messages.
/// </summary>
/// <remarks>
/// This is the RabbitMQ exchange where all outbox messages will be published.
/// The exchange must already exist; it will not be auto-declared.
/// </remarks>
public string ExchangeName { get; set; } = string.Empty;
}
Loading
Loading