Skip to content

Commit

Permalink
Automatically populate IBasicProperties.Type with msg body type and a…
Browse files Browse the repository at this point in the history
…dd Publish/GetMessage filters on RabbitMq Producer/Factory/Server
  • Loading branch information
mythz committed Oct 30, 2014
1 parent 240c411 commit 9e5c19f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/ServiceStack.RabbitMq/RabbitMqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public static void PopulateFromMessage(this IBasicProperties props, IMessage mes
props.Timestamp = new AmqpTimestamp(message.CreatedDate.ToUnixTime());
props.Priority = (byte)message.Priority;
props.ContentType = MimeTypes.Json;

if (message.Body != null)
{
props.Type = message.Body.GetType().Name;
}

if (message.ReplyTo != null)
{
Expand Down
8 changes: 7 additions & 1 deletion src/ServiceStack.RabbitMq/RabbitMqMessageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace ServiceStack.RabbitMq
public class RabbitMqMessageFactory : IMessageFactory
{
public ConnectionFactory ConnectionFactory { get; private set; }
public Action<string, IBasicProperties, IMessage> PublishMessageFilter { get; set; }
public Action<string, BasicGetResult> GetMessageFilter { get; set; }

private int retryCount;
public int RetryCount
Expand All @@ -22,7 +24,7 @@ public int RetryCount
}
}

public bool UsePolling { get; set; }
public bool UsePolling { get; set; }

public RabbitMqMessageFactory(string connectionString = "localhost",
string username = null, string password = null)
Expand Down Expand Up @@ -65,13 +67,17 @@ public IMessageQueueClient CreateMessageQueueClient()
{
return new RabbitMqQueueClient(this) {
RetryCount = RetryCount,
PublishMessageFilter = PublishMessageFilter,
GetMessageFilter = GetMessageFilter,
};
}

public IMessageProducer CreateMessageProducer()
{
return new RabbitMqProducer(this) {
RetryCount = RetryCount,
PublishMessageFilter = PublishMessageFilter,
GetMessageFilter = GetMessageFilter,
};
}

Expand Down
16 changes: 15 additions & 1 deletion src/ServiceStack.RabbitMq/RabbitMqProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class RabbitMqProducer : IMessageProducer, IOneWayClient
protected readonly RabbitMqMessageFactory msgFactory;
public int RetryCount { get; set; }
public Action OnPublishedCallback { get; set; }
public Action<string, IBasicProperties, IMessage> PublishMessageFilter { get; set; }
public Action<string, BasicGetResult> GetMessageFilter { get; set; }

private IConnection connection;
public IConnection Connection
Expand Down Expand Up @@ -89,6 +91,11 @@ public void Publish(string queueName, IMessage message, string exchange)
props.SetPersistent(true);
props.PopulateFromMessage(message);

if (PublishMessageFilter != null)
{
PublishMessageFilter(queueName, props, message);
}

var messageBytes = message.Body.ToJson().ToUtf8Bytes();

PublishMessage(exchange ?? QueueNames.Exchange,
Expand Down Expand Up @@ -155,7 +162,14 @@ public BasicGetResult GetMessage(string queueName, bool noAck)
Queues = new HashSet<string>(Queues) { queueName };
}

return Channel.BasicGet(queueName, noAck: noAck);
var basicMsg = Channel.BasicGet(queueName, noAck: noAck);

if (GetMessageFilter != null)
{
GetMessageFilter(queueName, basicMsg);
}

return basicMsg;
}
catch (OperationInterruptedException ex)
{
Expand Down
12 changes: 12 additions & 0 deletions src/ServiceStack.RabbitMq/RabbitMqServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public IMessageFactory MessageFactory
get { return messageFactory; }
}

public Action<string, IBasicProperties, IMessage> PublishMessageFilter
{
get { return messageFactory.PublishMessageFilter; }
set { messageFactory.PublishMessageFilter = value; }
}

public Action<string, BasicGetResult> GetMessageFilter
{
get { return messageFactory.GetMessageFilter; }
set { messageFactory.GetMessageFilter = value; }
}

/// <summary>
/// Execute global transformation or custom logic before a request is processed.
/// Must be thread-safe.
Expand Down
6 changes: 6 additions & 0 deletions src/ServiceStack.RabbitMq/RabbitMqWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ private void StartSubscription()
try
{
var e = consumer.Queue.Dequeue();

if (mqFactory.GetMessageFilter != null)
{
mqFactory.GetMessageFilter(QueueName, e);
}

messageHandler.ProcessMessage(mqClient, e);
}
catch (Exception ex)
Expand Down
55 changes: 55 additions & 0 deletions tests/ServiceStack.Server.Tests/Messaging/RabbitMqServerTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NUnit.Framework;
using RabbitMQ.Client;
using ServiceStack.Logging;
using ServiceStack.Messaging;
using ServiceStack.Messaging.Redis;
Expand Down Expand Up @@ -420,5 +422,58 @@ public void Can_publish_and_receive_messages_with_MessageFactory()
}
}

[Test]
public void Can_filter_published_and_received_messages()
{
string receivedMsgApp = null;
string receivedMsgType = null;

var mqServer = CreateMqServer();
mqServer.PublishMessageFilter = (queueName, properties, msg) =>
{
properties.AppId = "app:{0}".Fmt(queueName);
};
mqServer.GetMessageFilter = (queueName, basicMsg) =>
{
var props = basicMsg.BasicProperties;
receivedMsgType = props.Type; //automatically added by RabbitMqProducer
receivedMsgApp = props.AppId;
};

mqServer.RegisterHandler<Hello>(m => {
return new HelloResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) };
});

mqServer.Start();

using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "Bugs Bunny" });
}

Thread.Sleep(100);

mqServer.Dispose();

Assert.That(receivedMsgApp, Is.EqualTo("app:{0}".Fmt(QueueNames<Hello>.In)));
Assert.That(receivedMsgType, Is.EqualTo(typeof(Hello).Name));

using (IConnection connection = mqServer.ConnectionFactory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
var queueName = QueueNames<HelloResponse>.In;
channel.RegisterQueue(queueName);

var basicMsg = channel.BasicGet(queueName, noAck: true);
var props = basicMsg.BasicProperties;

Assert.That(props.Type, Is.EqualTo(typeof(HelloResponse).Name));
Assert.That(props.AppId, Is.EqualTo("app:{0}".Fmt(queueName)));

var msg = basicMsg.ToMessage<HelloResponse>();
Assert.That(msg.GetBody().Result, Is.EqualTo("Hello, Bugs Bunny!"));
}

}
}
}

0 comments on commit 9e5c19f

Please sign in to comment.