Skip to content

Commit

Permalink
Topic based routing rules for Rabbit MQ. Closes GH-708
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jan 26, 2024
1 parent 9decdb3 commit bb78e0b
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/guide/http/marten.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,5 +290,5 @@ public class ApprovedInvoicedCompiledQuery : ICompiledListQuery<Invoice>
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Documents.cs#L82-L92' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_compiled_query_return_query' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Documents.cs#L94-L104' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_compiled_query_return_query' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
6 changes: 4 additions & 2 deletions docs/guide/messaging/broadcast-to-topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ theSender = Host.CreateDefaultBuilder()
exchange.BindTopic("color.blue").ToQueue("blue");
exchange.BindTopic("color.*").ToQueue("all");
});

opts.PublishMessagesToRabbitMqExchange<RoutedMessage>("wolverine.topics", m => m.TopicName);
}).Start();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L24-L38' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_binding_topics_and_topic_patterns_to_queues' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L25-L41' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_binding_topics_and_topic_patterns_to_queues' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You can explicitly publish a message to a topic through this syntax:
Expand All @@ -31,7 +33,7 @@ var publisher = theSender.Services

await publisher.BroadcastToTopicAsync("color.purple", new Message1());
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L72-L79' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_send_to_topic' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L75-L82' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_send_to_topic' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Topic Sending as Cascading Message
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/messaging/transports/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class FirstMessage
public Guid Id { get; set; } = Guid.NewGuid();
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L150-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L182-L190' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Publishing by Topic Rules
Expand Down
47 changes: 45 additions & 2 deletions docs/guide/messaging/transports/rabbitmq/topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class FirstMessage
public Guid Id { get; set; } = Guid.NewGuid();
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L150-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L182-L190' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Of course, you can always explicitly send a message to a specific topic with this syntax:
Expand Down Expand Up @@ -98,7 +98,50 @@ theSender = Host.CreateDefaultBuilder()
exchange.BindTopic("color.blue").ToQueue("blue");
exchange.BindTopic("color.*").ToQueue("all");
});

opts.PublishMessagesToRabbitMqExchange<RoutedMessage>("wolverine.topics", m => m.TopicName);
}).Start();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L24-L38' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_binding_topics_and_topic_patterns_to_queues' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L25-L41' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_binding_topics_and_topic_patterns_to_queues' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Publishing by Topic Rule

As of Wolverine 1.16, you can specify publishing rules for messages by supplying
the logic to determine the topic name from the message itself. Let's say that we have
an interface that several of our message types implement like so:

<!-- snippet: sample_rabbit_itenantmessage -->
<a id='snippet-sample_rabbit_itenantmessage'></a>
```cs
public interface ITenantMessage
{
string TenantId { get; }
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L501-L508' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_rabbit_itenantmessage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Let's say that any message that implements that interface, we want published to the
topic for that messages `TenantId`. We can implement that rule like so:

<!-- snippet: sample_rabbit_topic_rules -->
<a id='snippet-sample_rabbit_topic_rules'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine((context, opts) =>
{
opts.UseRabbitMq();

// Publish any message that implements ITenantMessage to
// a Rabbit MQ "Topic" exchange named "tenant.messages"
opts.PublishMessagesToRabbitMqExchange<ITenantMessage>("tenant.messages",m => $"{m.GetType().Name.ToLower()}/{m.TenantId}")

// Specify or configure sending through Wolverine for all
// messages through this Exchange
.BufferedInMemory();
})
.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L480-L497' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_rabbit_topic_rules' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
17 changes: 16 additions & 1 deletion docs/guide/messaging/transports/sqs/deadletterqueues.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,22 @@ var host = await Host.CreateDefaultBuilder()
In one stroke, you can disable all usage of native SQS queues for dead letter queueing with this
syntax:

snippet: sample_disabling_all_sqs_dead_letter_queueing
<!-- snippet: sample_disabling_all_sqs_dead_letter_queueing -->
<a id='snippet-sample_disabling_all_sqs_dead_letter_queueing'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransportLocally()
// Disable all native SQS dead letter queueing
.DisableAllNativeDeadLetterQueues()
.AutoProvision();

opts.ListenToSqsQueue("incoming");
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Bugs/disabling_dead_letter_queue.cs#L15-L28' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disabling_all_sqs_dead_letter_queueing' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

This would force Wolverine to use any persistent envelope storage for dead letter queueing.

Expand Down
32 changes: 32 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,39 @@ public static async Task configuring_custom_interop()

#endregion
}


public static async Task publish_by_topic_rules()
{
#region sample_rabbit_topic_rules

using var host = await Host.CreateDefaultBuilder()
.UseWolverine((context, opts) =>
{
opts.UseRabbitMq();
// Publish any message that implements ITenantMessage to
// a Rabbit MQ "Topic" exchange named "tenant.messages"
opts.PublishMessagesToRabbitMqExchange<ITenantMessage>("tenant.messages",m => $"{m.GetType().Name.ToLower()}/{m.TenantId}")
// Specify or configure sending through Wolverine for all
// messages through this Exchange
.BufferedInMemory();
})
.StartAsync();

#endregion
}
}

#region sample_rabbit_itenantmessage

public interface ITenantMessage
{
string TenantId { get; }
}

#endregion


public record SendEmail();
65 changes: 48 additions & 17 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace Wolverine.RabbitMQ.Tests;

public class send_by_topics : IDisposable
{
private readonly IHost theFirstReceiver;
private readonly IHost theSecondReceiver;
private readonly IHost theGreenReceiver;
private readonly IHost theBlueReceiver;
private readonly IHost theSender;
private readonly IHost theThirdReceiver;

Expand All @@ -34,20 +34,22 @@ public send_by_topics()
exchange.BindTopic("color.blue").ToQueue("blue");
exchange.BindTopic("color.*").ToQueue("all");
});
opts.PublishMessagesToRabbitMqExchange<RoutedMessage>("wolverine.topics", m => m.TopicName);
}).Start();

#endregion

theFirstReceiver = WolverineHost.For(opts =>
theGreenReceiver = WolverineHost.For(opts =>
{
opts.ServiceName = "First";
opts.ServiceName = "Green";
opts.ListenToRabbitQueue("green");
opts.UseRabbitMq();
});

theSecondReceiver = WolverineHost.For(opts =>
theBlueReceiver = WolverineHost.For(opts =>
{
opts.ServiceName = "Second";
opts.ServiceName = "Blue";
opts.ListenToRabbitQueue("blue");
opts.UseRabbitMq();
});
Expand All @@ -63,8 +65,8 @@ public send_by_topics()
public void Dispose()
{
theSender?.Dispose();
theFirstReceiver?.Dispose();
theSecondReceiver?.Dispose();
theGreenReceiver?.Dispose();
theBlueReceiver?.Dispose();
theThirdReceiver?.Dispose();
}

Expand All @@ -86,7 +88,7 @@ public async Task send_by_message_topic()
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theFirstReceiver, theSecondReceiver, theThirdReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
Expand All @@ -101,13 +103,13 @@ public async Task send_by_message_topic_to_multiple_listeners()
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theFirstReceiver, theSecondReceiver, theThirdReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(new FirstMessage());

session.FindEnvelopesWithMessageType<FirstMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x).ShouldHaveTheSameElementsAs("Second", "Third");
.OrderBy(x => x).ShouldHaveTheSameElementsAs("Blue", "Third");
}

[Fact]
Expand All @@ -116,14 +118,14 @@ public async Task send_by_explicit_topic()
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theFirstReceiver, theSecondReceiver, theThirdReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.BroadcastMessageToTopicAndWaitAsync("color.green", new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x)
.ShouldHaveTheSameElementsAs("First", "Third");
.ShouldHaveTheSameElementsAs("Green", "Third");
}

[Fact] // this is occasionally failing with timeouts when running in combination with the entire suite
Expand All @@ -132,14 +134,14 @@ public async Task send_by_explicit_topic_2()
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theFirstReceiver, theSecondReceiver, theThirdReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.BroadcastMessageToTopicAndWaitAsync("color.blue", new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x)
.ShouldHaveTheSameElementsAs("Second", "Third");
.ShouldHaveTheSameElementsAs("Blue", "Third");
}

[Fact]
Expand All @@ -148,10 +150,28 @@ public async Task send_to_topic_with_delay()
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<FirstMessage>(theSecondReceiver)
.AlsoTrack(theFirstReceiver, theSecondReceiver, theThirdReceiver)
.WaitForMessageToBeReceivedAt<FirstMessage>(theBlueReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.InvokeMessageAndWaitAsync(new TriggerTopicMessage());
}

[Fact]
public async Task publish_by_user_message_topic_logic()
{
var routed = new RoutedMessage { TopicName = "color.blue" };

var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<RoutedMessage>(theBlueReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(routed);

var record = session.Received.RecordsInOrder().Single(x => x.ServiceName == "Blue");

record.Envelope.Message.ShouldBeOfType<RoutedMessage>()
.Id.ShouldBe(routed.Id);
}
}

[Topic("color.purple")]
Expand All @@ -177,10 +197,21 @@ public class ThirdMessage : FirstMessage
{
}

public class RoutedMessage
{
public string TopicName { get; set; }
public Guid Id { get; set; } = Guid.NewGuid();
}

public class TriggerTopicMessage{}

public class MessagesHandler
{
public static void Handle(RoutedMessage message)
{

}

public object Handle(TriggerTopicMessage message)
{
return new FirstMessage().ToTopic("color.blue", new DeliveryOptions { ScheduleDelay = 3.Seconds() });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ internal RabbitMqExchange(string name, RabbitMqTransport parent)

internal LightweightCache<string, RabbitMqTopicEndpoint> Topics { get; }
internal LightweightCache<string, RabbitMqRouting> Routings { get; }


public override bool AutoStartSendingAgent()
{
return base.AutoStartSendingAgent() || ExchangeType == ExchangeType.Topic;
}

public bool HasDeclared { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using RabbitMQ.Client;
using Wolverine.Configuration;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;

namespace Wolverine.RabbitMQ.Internal;
Expand Down Expand Up @@ -246,4 +247,6 @@ public RabbitMqTransportExpression CustomizeDeadLetterQueueing(DeadLetterQueue d

return this;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using RabbitMQ.Client;
using Wolverine.Configuration;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Runtime.Routing;

namespace Wolverine.RabbitMQ;

Expand All @@ -20,6 +21,28 @@ internal static RabbitMqTransport RabbitMqTransport(this WolverineOptions endpoi

return transports.GetOrCreate<RabbitMqTransport>();
}

/// <summary>
/// Publish messages that are of type T or could be cast to type T to a Rabbit MQ
/// topic exchange using the supplied function to determine the topic for the message
/// </summary>
/// <param name="exchangeName"></param>
/// <param name="topicSource"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static RabbitMqExchangeConfiguration PublishMessagesToRabbitMqExchange<T>(this WolverineOptions options, string exchangeName,
Func<T, string> topicSource)
{
var transport = options.RabbitMqTransport();
var exchange = transport.Exchanges[exchangeName];
exchange.ExchangeType = ExchangeType.Topic;
exchange.RoutingType = RoutingMode.ByTopic;

var routing = new TopicRouting<T>(topicSource, exchange);
options.PublishWithMessageRoutingSource(routing);

return new RabbitMqExchangeConfiguration(exchange);
}

/// <summary>
/// Configure connection and authentication information about the Rabbit MQ usage
Expand Down

0 comments on commit bb78e0b

Please sign in to comment.