Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for setting up publisher confirms for Rabbit MQ #153

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/MassTransit/Context/SendContext.cs
Expand Up @@ -29,7 +29,7 @@ public SendContext(T message)
{ {
_id = NewId.NextGuid(); _id = NewId.NextGuid();
_message = message; _message = message;

this.SetMessageType(typeof (T)); this.SetMessageType(typeof (T));
DeclaringMessageType = typeof (T); DeclaringMessageType = typeof (T);
} }
Expand Down
Expand Up @@ -13,9 +13,12 @@
namespace MassTransit.Transports.RabbitMq.Configuration.Builders namespace MassTransit.Transports.RabbitMq.Configuration.Builders
{ {
using System; using System;
using PublisherConfirm;


public interface RabbitMqTransportFactoryBuilder public interface RabbitMqTransportFactoryBuilder
{ {
void AddConnectionFactoryBuilder(Uri uri, ConnectionFactoryBuilder connectionFactoryBuilder); void AddConnectionFactoryBuilder(Uri uri, ConnectionFactoryBuilder connectionFactoryBuilder);

void SetPublisherConfirmSettings(IPublisherConfirmSettings publisherConfirmSettings);
} }
} }
Expand Up @@ -14,11 +14,12 @@ namespace MassTransit.Transports.RabbitMq.Configuration.Builders
{ {
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using PublisherConfirm;


public class RabbitMqTransportFactoryBuilderImpl : public class RabbitMqTransportFactoryBuilderImpl : RabbitMqTransportFactoryBuilder
RabbitMqTransportFactoryBuilder
{ {
readonly IDictionary<Uri, ConnectionFactoryBuilder> _connectionFactoryBuilders; readonly IDictionary<Uri, ConnectionFactoryBuilder> _connectionFactoryBuilders;
IPublisherConfirmSettings _publisherConfirmSettings = new PublisherConfirmSettings();


public RabbitMqTransportFactoryBuilderImpl() public RabbitMqTransportFactoryBuilderImpl()
{ {
Expand All @@ -30,9 +31,14 @@ public void AddConnectionFactoryBuilder(Uri uri, ConnectionFactoryBuilder connec
_connectionFactoryBuilders[uri] = connectionFactoryBuilder; _connectionFactoryBuilders[uri] = connectionFactoryBuilder;
} }


public void SetPublisherConfirmSettings(IPublisherConfirmSettings publisherConfirmSettings)
{
_publisherConfirmSettings = publisherConfirmSettings;
}

public RabbitMqTransportFactory Build() public RabbitMqTransportFactory Build()
{ {
var factory = new RabbitMqTransportFactory(_connectionFactoryBuilders); var factory = new RabbitMqTransportFactory(_connectionFactoryBuilders, _publisherConfirmSettings);


return factory; return factory;
} }
Expand Down
@@ -0,0 +1,24 @@
// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Transports.RabbitMq.Configuration.Configurators
{
using System;

/// <summary>
/// Configures SSL/TLS for RabbitMQ. See http://www.rabbitmq.com/ssl.html
/// for details on how to set up RabbitMQ for SSL.
/// </summary>
public interface PublisherConfirmFactoryConfigurator
{
}
}
@@ -0,0 +1,68 @@
// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Transports.RabbitMq.Configuration.Configurators
{
using System;
using System.Collections.Generic;
using Builders;
using MassTransit.Configurators;
using PublisherConfirm;

public class PublisherConfirmFactoryConfiguratorImpl :
PublisherConfirmFactoryConfigurator,
RabbitMqTransportFactoryBuilderConfigurator
{
readonly bool _usePublisherConfirms;
readonly Action<ulong, string> _registerMessageAction;
readonly Action<ulong, bool> _acktion;
readonly Action<ulong, bool> _nacktion;

public PublisherConfirmFactoryConfiguratorImpl(bool usePublisherConfirms, Action<ulong, string> registerMessageAction,
Action<ulong, bool> acktion, Action<ulong, bool> nacktion)
{
_usePublisherConfirms = usePublisherConfirms;
_registerMessageAction = registerMessageAction;
_acktion = acktion;
_nacktion = nacktion;
}

public RabbitMqTransportFactoryBuilder Configure(RabbitMqTransportFactoryBuilder builder)
{
builder.SetPublisherConfirmSettings(
new PublisherConfirmSettings
{
UsePublisherConfirms = _usePublisherConfirms,
RegisterMessageAction = _registerMessageAction,
Acktion = _acktion,
Nacktion = _nacktion
}
);

return builder;
}

public IEnumerable<ValidationResult> Validate()
{
if (_usePublisherConfirms)
{
if (_registerMessageAction == null)
yield return this.Failure("RegisterMessageAction", "RegisterMessageAction must be specified if publisher confirms are enabled");
if (_acktion == null)
yield return this.Failure("Acktion", "Acktion must be specified if publisher confirms are enabled");
if(_nacktion == null)
yield return this.Failure("Nacktion", "Nacktion must be specified if publisher confirms are enabled");
}
}

}
}
Expand Up @@ -26,5 +26,14 @@ public static class RabbitMqTransportFactoryExtensions


configurator.AddConfigurator(hostConfigurator); configurator.AddConfigurator(hostConfigurator);
} }

public static void UsePublisherConfirms(this RabbitMqTransportFactoryConfigurator configurator,
Action<ulong, string> registerMessageAction, Action<ulong, bool> acktion, Action<ulong, bool> nacktion)
{
var hostConfigurator = new PublisherConfirmFactoryConfiguratorImpl(true, registerMessageAction, acktion, nacktion);

configurator.AddConfigurator(hostConfigurator);
}

} }
} }
Expand Up @@ -56,20 +56,22 @@
<NoWarn>1591</NoWarn> <NoWarn>1591</NoWarn>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Reference Include="Magnum" Condition="'$(TargetFrameworkVersion)' == 'v3.5'"> <Reference Include="Magnum">
<HintPath>..\..\..\lib\Magnum\net-3.5\Magnum.dll</HintPath>
</Reference>
<Reference Include="Magnum" Condition="'$(TargetFrameworkVersion)' == 'v4.0'">
<HintPath>..\..\..\lib\Magnum\net-4.0\Magnum.dll</HintPath> <HintPath>..\..\..\lib\Magnum\net-4.0\Magnum.dll</HintPath>
</Reference> </Reference>
<Reference Include="RabbitMQ.Client"> <Reference Include="MassTransit">
<HintPath>..\..\packages\MassTransit.2.7.0\lib\net40\MassTransit.dll</HintPath>
</Reference>
<Reference Include="RabbitMQ.Client, Version=2.8.7.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion> <SpecificVersion>False</SpecificVersion>
<HintPath>..\..\..\lib\RabbitMQ\RabbitMQ.Client.dll</HintPath> <HintPath>..\..\packages\RabbitMQ.Client.2.8.7\lib\net30\RabbitMQ.Client.dll</HintPath>
</Reference> </Reference>
<Reference Include="System" /> <Reference Include="System" />
<Reference Include="System.Configuration" />
<Reference Include="System.Core"> <Reference Include="System.Core">
<RequiredTargetFramework>3.5</RequiredTargetFramework> <RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference> </Reference>
<Reference Include="System.Data" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="..\..\SolutionVersion.cs"> <Compile Include="..\..\SolutionVersion.cs">
Expand All @@ -82,6 +84,8 @@
<Compile Include="Configuration\Configurators\ConnectionFactoryBuilderConfigurator.cs" /> <Compile Include="Configuration\Configurators\ConnectionFactoryBuilderConfigurator.cs" />
<Compile Include="Configuration\Configurators\ConnectionFactoryConfigurator.cs" /> <Compile Include="Configuration\Configurators\ConnectionFactoryConfigurator.cs" />
<Compile Include="Configuration\Configurators\ConnectionFactoryConfiguratorImpl.cs" /> <Compile Include="Configuration\Configurators\ConnectionFactoryConfiguratorImpl.cs" />
<Compile Include="Configuration\Configurators\PublisherConfirmFactoryConfigurator.cs" />
<Compile Include="Configuration\Configurators\PublisherConfirmFactoryConfiguratorImpl.cs" />
<Compile Include="Configuration\Configurators\SslConnectionFactoryConfigurator.cs" /> <Compile Include="Configuration\Configurators\SslConnectionFactoryConfigurator.cs" />
<Compile Include="Configuration\Configurators\SslConnectionFactoryConfiguratorImpl.cs" /> <Compile Include="Configuration\Configurators\SslConnectionFactoryConfiguratorImpl.cs" />
<Compile Include="Configuration\Configurators\RabbitMqTransportFactoryBuilderConfigurator.cs" /> <Compile Include="Configuration\Configurators\RabbitMqTransportFactoryBuilderConfigurator.cs" />
Expand All @@ -95,6 +99,8 @@
<Compile Include="OutboundRabbitMqTransport.cs" /> <Compile Include="OutboundRabbitMqTransport.cs" />
<Compile Include="PublishEndpointInterceptor.cs" /> <Compile Include="PublishEndpointInterceptor.cs" />
<Compile Include="PublishEndpointSinkLocator.cs" /> <Compile Include="PublishEndpointSinkLocator.cs" />
<Compile Include="PublisherConfirm\IPublisherConfirmSettings.cs" />
<Compile Include="PublisherConfirm\PublisherConfirmSettings.cs" />
<Compile Include="RabbitMqConnection.cs" /> <Compile Include="RabbitMqConnection.cs" />
<Compile Include="RabbitMqConsumer.cs" /> <Compile Include="RabbitMqConsumer.cs" />
<Compile Include="RabbitMqEndpointAddress.cs" /> <Compile Include="RabbitMqEndpointAddress.cs" />
Expand All @@ -110,13 +116,6 @@
<Compile Include="Testing\BusTestScenarioExtensions.cs" /> <Compile Include="Testing\BusTestScenarioExtensions.cs" />
<Compile Include="Testing\RabbitMqBusScenarioBuilder.cs" /> <Compile Include="Testing\RabbitMqBusScenarioBuilder.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\MassTransit\MassTransit.csproj">
<Project>{6EFD69FC-CBCC-4F85-AEE0-EFBA73F4D273}</Project>
<Name>MassTransit</Name>
<Private>False</Private>
</ProjectReference>
</ItemGroup>
<ItemGroup> <ItemGroup>
<BootstrapperPackage Include="Microsoft.Net.Client.3.5"> <BootstrapperPackage Include="Microsoft.Net.Client.3.5">
<Visible>False</Visible> <Visible>False</Visible>
Expand All @@ -137,6 +136,9 @@
<ItemGroup> <ItemGroup>
<None Include="README.md" /> <None Include="README.md" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<WCFMetadata Include="Service References\" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
Expand Up @@ -16,6 +16,7 @@ namespace MassTransit.Transports.RabbitMq
using System.Collections; using System.Collections;
using System.IO; using System.IO;
using Magnum; using Magnum;
using PublisherConfirm;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Exceptions;


Expand All @@ -25,14 +26,18 @@ public class OutboundRabbitMqTransport :
readonly IRabbitMqEndpointAddress _address; readonly IRabbitMqEndpointAddress _address;
readonly bool _bindToQueue; readonly bool _bindToQueue;
readonly ConnectionHandler<RabbitMqConnection> _connectionHandler; readonly ConnectionHandler<RabbitMqConnection> _connectionHandler;
readonly IPublisherConfirmSettings _publisherConfirmSettings;
RabbitMqProducer _producer; RabbitMqProducer _producer;


public OutboundRabbitMqTransport(IRabbitMqEndpointAddress address, public OutboundRabbitMqTransport(IRabbitMqEndpointAddress address,
ConnectionHandler<RabbitMqConnection> connectionHandler, bool bindToQueue) ConnectionHandler<RabbitMqConnection> connectionHandler,
IPublisherConfirmSettings publisherConfirmSettings,
bool bindToQueue)
{ {
_address = address; _address = address;
_connectionHandler = connectionHandler; _connectionHandler = connectionHandler;
_bindToQueue = bindToQueue; _bindToQueue = bindToQueue;
_publisherConfirmSettings = publisherConfirmSettings;
} }


public IEndpointAddress Address public IEndpointAddress Address
Expand All @@ -52,21 +57,24 @@ public void Send(ISendContext context)


properties.SetPersistent(true); properties.SetPersistent(true);
properties.MessageId = context.MessageId ?? properties.MessageId ?? NewId.Next().ToString(); properties.MessageId = context.MessageId ?? properties.MessageId ?? NewId.Next().ToString();

if (context.ExpirationTime.HasValue) if (context.ExpirationTime.HasValue)
{ {
DateTime value = context.ExpirationTime.Value; DateTime value = context.ExpirationTime.Value;
properties.Expiration = properties.Expiration = (value.Kind == DateTimeKind.Utc ? value - SystemUtil.UtcNow : value - SystemUtil.Now).ToString();
(value.Kind == DateTimeKind.Utc }
? value - SystemUtil.UtcNow
: value - SystemUtil.Now). properties.Headers = new Hashtable { { "Content-Type", context.ContentType } };
ToString();
if (context.Headers[PublisherConfirmSettings.ClientMessageId] != null)
{
properties.Headers.Add(PublisherConfirmSettings.ClientMessageId, context.Headers[PublisherConfirmSettings.ClientMessageId]);
} }


using (var body = new MemoryStream()) using (var body = new MemoryStream())
{ {
context.SerializeTo(body); context.SerializeTo(body);
properties.Headers = new Hashtable {{"Content-Type", context.ContentType}};

_producer.Publish(_address.Name, properties, body.ToArray()); _producer.Publish(_address.Name, properties, body.ToArray());


_address.LogSent(context.MessageId ?? "", context.MessageType); _address.LogSent(context.MessageId ?? "", context.MessageType);
Expand Down Expand Up @@ -97,7 +105,7 @@ void AddProducerBinding()
if (_producer != null) if (_producer != null)
return; return;


_producer = new RabbitMqProducer(_address, _bindToQueue); _producer = new RabbitMqProducer(_address, _publisherConfirmSettings, _bindToQueue);


_connectionHandler.AddBinding(_producer); _connectionHandler.AddBinding(_producer);
} }
Expand Down
@@ -0,0 +1,16 @@
namespace MassTransit.Transports.RabbitMq.PublisherConfirm
{
using System;

public interface IPublisherConfirmSettings
{
bool UsePublisherConfirms { get; set; }

Action<ulong, string> RegisterMessageAction { get; set; }

Action<ulong, bool> Acktion { get; set; }

Action<ulong, bool> Nacktion { get; set; }

}
}
@@ -0,0 +1,18 @@
namespace MassTransit.Transports.RabbitMq.PublisherConfirm
{
using System;

public class PublisherConfirmSettings : IPublisherConfirmSettings
{
public const string ClientMessageId = "ClientMessageId";

public bool UsePublisherConfirms { get; set; }

public Action<ulong, string> RegisterMessageAction { get; set; }

public Action<ulong, bool> Acktion { get; set; }

public Action<ulong, bool> Nacktion { get; set; }

}
}