Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Added support for setting up publisher confirms for Rabbit MQ #153

Closed
wants to merge 8 commits into from

3 participants

Swannee Dru Sellers Chris Patterson
Swannee

Did the following to enable publisher confirms with external tracking:
1) Added publisher-confirm ack/nack support to RabbitMqProducer
2) Added PublisherConfirmSettings class to store the following:

  • UsePublisherConfirms - Determines whether or not publisher confirms are used.
  • RegisterMessageAction - Action called before a publish so that external app can register the message in the buffer of their choice. Provides the Rabbit publish/confirm sequence number.
  • Acktion - Action called when a publisher confirm Ack is returned by the broker.
  • Nacktion - Action called when a publisher confirm Nack is returned by the broker. 3) Updated RabbitMqTransportFactoryBuilder and Impl to allow setting of publisher confirm settings. 4) Added PublisherConfirmFactoryConfigurator and Impl to configure these settings 5) Updated RabbitMqTransportFactoryExtensions to add "UsePublisherConfirms" extension method. 6) Updated OutboundRabbitMqTransport send method to assign a message ID from the header so that it is accessible in the RabbitMqProducer.Publish method.

Configure as follows:
builder.Register(c => ServiceBusFactory.New(sbc =>
{
sbc.UseRabbitMq(conf => conf.UsePublisherConfirms(RegisterMessageAction, Acktion, Nacktion));

        })).SingleInstance();
    }

    public void RegisterMessageAction(ulong sequenceNumber, string correlationId)
    {
        //TODO: store in buffer
    }

    public void Acktion(ulong sequenceNumber, bool isMultiple)
    {
        //TODO: Remove from buffer
    }

    public void Nacktion(ulong sequenceNumber, bool isMultiple)
    {
        //TODO: Store unacked messages to a more permanent place for reprocessing
    }

Please let me know if there's a better way to accomplish this. Thanks,
Eric

Dru Sellers
Owner

@eswann sorry for the very long delay but I wanted to get a note back to you. The publisher confirms are already in place for the latest ga release. They are just immediate and always on. I hope that this is helpful to you.

Dru Sellers drusellers closed this
Swannee

@drusellers Hey, I've looked at the implementation, the main issue i have with the current implementation is that I didn't see a way to pass a delegate to execute when an ack/nack happens. For Rabbit, this is very important because unlike MSMQ, the publisher doesn't have a local place to store messages if a nack happens. If they just sit in memory, you'll eventually overwhelm the system (if you publish the number of messages that we do). After talking to the Rabbit guys, if you ever get a Nack from Rabbit, it essentially means that Rabbit has completely crapped its pants and most probably won't become available without manual intervention, although this should be rare you risk huge message loss if it does happen.

What we do is wrap our publisher with a backup mechanism where it can write messages to disk or DB if Rabbit has issues (nacks or connectivity issues). If the system starts storing locally, it occasionally tries to send a message and if it gets an ack, the system knows it's ok to start publishing all messages again. If we don't have a way to communicate out the ack/nack, we can't take action. So I created the PublisherConfirmSettings that take an ack/nack delegate.

If there's a better way that you can see to approach that, definitely let me know.

Thanks!
Eric

Chris Patterson
Owner

The communication method between MT and the transports will be revisited in 3.0 (and likely changed) to support fully asynchronous operation using the TPL. Until then, the code put into place was a short-term solution. The 3.0 level solution will have several extensions points along the path to inject behaviors (all using TPL continuations) to handle things like failing to send to the broker, waiting for ack, handling immediate and mandatory failures via select, etc.

This will also include an entirely new high-availability client for RabbitMQ and Azure Service Bus.

An unfortunately consequence is that .NET 4.0 and later is all that will be supported, leaving support for .NET 3.5 in the MT 2.x lineage (which will be completely serializer compatible allowing interop with older solutions).

Swannee

Thanks sounds great. Any idea when that will be ready for primetime?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 242 additions and 39 deletions.
  1. +1 −1  src/MassTransit/Context/SendContext.cs
  2. +4 −1 src/Transports/MassTransit.Transports.RabbitMq/Configuration/Builders/RabbitMqTransportFactoryBuilder.cs
  3. +9 −3 src/Transports/MassTransit.Transports.RabbitMq/Configuration/Builders/RabbitMqTransportFactoryBuilderImpl.cs
  4. +24 −0 ...ansports/MassTransit.Transports.RabbitMq/Configuration/Configurators/PublisherConfirmFactoryConfigurator.cs
  5. +68 −0 ...orts/MassTransit.Transports.RabbitMq/Configuration/Configurators/PublisherConfirmFactoryConfiguratorImpl.cs
  6. +9 −0 src/Transports/MassTransit.Transports.RabbitMq/Configuration/RabbitMqTransportFactoryExtensions.cs
  7. +15 −13 src/Transports/MassTransit.Transports.RabbitMq/MassTransit.Transports.RabbitMq.csproj
  8. +17 −9 src/Transports/MassTransit.Transports.RabbitMq/OutboundRabbitMqTransport.cs
  9. +16 −0 src/Transports/MassTransit.Transports.RabbitMq/PublisherConfirm/IPublisherConfirmSettings.cs
  10. +18 −0 src/Transports/MassTransit.Transports.RabbitMq/PublisherConfirm/PublisherConfirmSettings.cs
  11. +47 −4 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqProducer.cs
  12. +8 −8 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqTransportFactory.cs
  13. +5 −0 src/Transports/MassTransit.Transports.RabbitMq/packages.config
  14. +1 −0  src/packages/repositories.config
2  src/MassTransit/Context/SendContext.cs
View
@@ -29,7 +29,7 @@ public SendContext(T message)
{
_id = NewId.NextGuid();
_message = message;
-
+
this.SetMessageType(typeof (T));
DeclaringMessageType = typeof (T);
}
5 src/Transports/MassTransit.Transports.RabbitMq/Configuration/Builders/RabbitMqTransportFactoryBuilder.cs
View
@@ -13,9 +13,12 @@
namespace MassTransit.Transports.RabbitMq.Configuration.Builders
{
using System;
+ using PublisherConfirm;
- public interface RabbitMqTransportFactoryBuilder
+ public interface RabbitMqTransportFactoryBuilder
{
void AddConnectionFactoryBuilder(Uri uri, ConnectionFactoryBuilder connectionFactoryBuilder);
+
+ void SetPublisherConfirmSettings(IPublisherConfirmSettings publisherConfirmSettings);
}
}
12 ...Transports/MassTransit.Transports.RabbitMq/Configuration/Builders/RabbitMqTransportFactoryBuilderImpl.cs
View
@@ -14,11 +14,12 @@ namespace MassTransit.Transports.RabbitMq.Configuration.Builders
{
using System;
using System.Collections.Generic;
+ using PublisherConfirm;
- public class RabbitMqTransportFactoryBuilderImpl :
- RabbitMqTransportFactoryBuilder
+ public class RabbitMqTransportFactoryBuilderImpl : RabbitMqTransportFactoryBuilder
{
readonly IDictionary<Uri, ConnectionFactoryBuilder> _connectionFactoryBuilders;
+ IPublisherConfirmSettings _publisherConfirmSettings = new PublisherConfirmSettings();
public RabbitMqTransportFactoryBuilderImpl()
{
@@ -30,9 +31,14 @@ public void AddConnectionFactoryBuilder(Uri uri, ConnectionFactoryBuilder connec
_connectionFactoryBuilders[uri] = connectionFactoryBuilder;
}
+ public void SetPublisherConfirmSettings(IPublisherConfirmSettings publisherConfirmSettings)
+ {
+ _publisherConfirmSettings = publisherConfirmSettings;
+ }
+
public RabbitMqTransportFactory Build()
{
- var factory = new RabbitMqTransportFactory(_connectionFactoryBuilders);
+ var factory = new RabbitMqTransportFactory(_connectionFactoryBuilders, _publisherConfirmSettings);
return factory;
}
24 ...ports/MassTransit.Transports.RabbitMq/Configuration/Configurators/PublisherConfirmFactoryConfigurator.cs
View
@@ -0,0 +1,24 @@
+// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// this file except in compliance with the License. You may obtain a copy of the
+// License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed
+// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+// CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+namespace MassTransit.Transports.RabbitMq.Configuration.Configurators
+{
+ using System;
+
+ /// <summary>
+ /// Configures SSL/TLS for RabbitMQ. See http://www.rabbitmq.com/ssl.html
+ /// for details on how to set up RabbitMQ for SSL.
+ /// </summary>
+ public interface PublisherConfirmFactoryConfigurator
+ {
+ }
+}
68 ...s/MassTransit.Transports.RabbitMq/Configuration/Configurators/PublisherConfirmFactoryConfiguratorImpl.cs
View
@@ -0,0 +1,68 @@
+// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// this file except in compliance with the License. You may obtain a copy of the
+// License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed
+// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+// CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+namespace MassTransit.Transports.RabbitMq.Configuration.Configurators
+{
+ using System;
+ using System.Collections.Generic;
+ using Builders;
+ using MassTransit.Configurators;
+ using PublisherConfirm;
+
+ public class PublisherConfirmFactoryConfiguratorImpl :
+ PublisherConfirmFactoryConfigurator,
+ RabbitMqTransportFactoryBuilderConfigurator
+ {
+ readonly bool _usePublisherConfirms;
+ readonly Action<ulong, string> _registerMessageAction;
+ readonly Action<ulong, bool> _acktion;
+ readonly Action<ulong, bool> _nacktion;
+
+ public PublisherConfirmFactoryConfiguratorImpl(bool usePublisherConfirms, Action<ulong, string> registerMessageAction,
+ Action<ulong, bool> acktion, Action<ulong, bool> nacktion)
+ {
+ _usePublisherConfirms = usePublisherConfirms;
+ _registerMessageAction = registerMessageAction;
+ _acktion = acktion;
+ _nacktion = nacktion;
+ }
+
+ public RabbitMqTransportFactoryBuilder Configure(RabbitMqTransportFactoryBuilder builder)
+ {
+ builder.SetPublisherConfirmSettings(
+ new PublisherConfirmSettings
+ {
+ UsePublisherConfirms = _usePublisherConfirms,
+ RegisterMessageAction = _registerMessageAction,
+ Acktion = _acktion,
+ Nacktion = _nacktion
+ }
+ );
+
+ return builder;
+ }
+
+ public IEnumerable<ValidationResult> Validate()
+ {
+ if (_usePublisherConfirms)
+ {
+ if (_registerMessageAction == null)
+ yield return this.Failure("RegisterMessageAction", "RegisterMessageAction must be specified if publisher confirms are enabled");
+ if (_acktion == null)
+ yield return this.Failure("Acktion", "Acktion must be specified if publisher confirms are enabled");
+ if(_nacktion == null)
+ yield return this.Failure("Nacktion", "Nacktion must be specified if publisher confirms are enabled");
+ }
+ }
+
+ }
+}
9 src/Transports/MassTransit.Transports.RabbitMq/Configuration/RabbitMqTransportFactoryExtensions.cs
View
@@ -26,5 +26,14 @@ public static class RabbitMqTransportFactoryExtensions
configurator.AddConfigurator(hostConfigurator);
}
+
+ public static void UsePublisherConfirms(this RabbitMqTransportFactoryConfigurator configurator,
+ Action<ulong, string> registerMessageAction, Action<ulong, bool> acktion, Action<ulong, bool> nacktion)
+ {
+ var hostConfigurator = new PublisherConfirmFactoryConfiguratorImpl(true, registerMessageAction, acktion, nacktion);
+
+ configurator.AddConfigurator(hostConfigurator);
+ }
+
}
}
28 src/Transports/MassTransit.Transports.RabbitMq/MassTransit.Transports.RabbitMq.csproj
View
@@ -56,20 +56,22 @@
<NoWarn>1591</NoWarn>
</PropertyGroup>
<ItemGroup>
- <Reference Include="Magnum" Condition="'$(TargetFrameworkVersion)' == 'v3.5'">
- <HintPath>..\..\..\lib\Magnum\net-3.5\Magnum.dll</HintPath>
- </Reference>
- <Reference Include="Magnum" Condition="'$(TargetFrameworkVersion)' == 'v4.0'">
+ <Reference Include="Magnum">
<HintPath>..\..\..\lib\Magnum\net-4.0\Magnum.dll</HintPath>
</Reference>
- <Reference Include="RabbitMQ.Client">
+ <Reference Include="MassTransit">
+ <HintPath>..\..\packages\MassTransit.2.7.0\lib\net40\MassTransit.dll</HintPath>
+ </Reference>
+ <Reference Include="RabbitMQ.Client, Version=2.8.7.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
- <HintPath>..\..\..\lib\RabbitMQ\RabbitMQ.Client.dll</HintPath>
+ <HintPath>..\..\packages\RabbitMQ.Client.2.8.7\lib\net30\RabbitMQ.Client.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.Configuration" />
<Reference Include="System.Core">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
+ <Reference Include="System.Data" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\..\SolutionVersion.cs">
@@ -82,6 +84,8 @@
<Compile Include="Configuration\Configurators\ConnectionFactoryBuilderConfigurator.cs" />
<Compile Include="Configuration\Configurators\ConnectionFactoryConfigurator.cs" />
<Compile Include="Configuration\Configurators\ConnectionFactoryConfiguratorImpl.cs" />
+ <Compile Include="Configuration\Configurators\PublisherConfirmFactoryConfigurator.cs" />
+ <Compile Include="Configuration\Configurators\PublisherConfirmFactoryConfiguratorImpl.cs" />
<Compile Include="Configuration\Configurators\SslConnectionFactoryConfigurator.cs" />
<Compile Include="Configuration\Configurators\SslConnectionFactoryConfiguratorImpl.cs" />
<Compile Include="Configuration\Configurators\RabbitMqTransportFactoryBuilderConfigurator.cs" />
@@ -95,6 +99,8 @@
<Compile Include="OutboundRabbitMqTransport.cs" />
<Compile Include="PublishEndpointInterceptor.cs" />
<Compile Include="PublishEndpointSinkLocator.cs" />
+ <Compile Include="PublisherConfirm\IPublisherConfirmSettings.cs" />
+ <Compile Include="PublisherConfirm\PublisherConfirmSettings.cs" />
<Compile Include="RabbitMqConnection.cs" />
<Compile Include="RabbitMqConsumer.cs" />
<Compile Include="RabbitMqEndpointAddress.cs" />
@@ -111,13 +117,6 @@
<Compile Include="Testing\RabbitMqBusScenarioBuilder.cs" />
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\MassTransit\MassTransit.csproj">
- <Project>{6EFD69FC-CBCC-4F85-AEE0-EFBA73F4D273}</Project>
- <Name>MassTransit</Name>
- <Private>False</Private>
- </ProjectReference>
- </ItemGroup>
- <ItemGroup>
<BootstrapperPackage Include="Microsoft.Net.Client.3.5">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName>
@@ -137,6 +136,9 @@
<ItemGroup>
<None Include="README.md" />
</ItemGroup>
+ <ItemGroup>
+ <WCFMetadata Include="Service References\" />
+ </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
26 src/Transports/MassTransit.Transports.RabbitMq/OutboundRabbitMqTransport.cs
View
@@ -16,6 +16,7 @@ namespace MassTransit.Transports.RabbitMq
using System.Collections;
using System.IO;
using Magnum;
+ using PublisherConfirm;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
@@ -25,14 +26,18 @@ public class OutboundRabbitMqTransport :
readonly IRabbitMqEndpointAddress _address;
readonly bool _bindToQueue;
readonly ConnectionHandler<RabbitMqConnection> _connectionHandler;
+ readonly IPublisherConfirmSettings _publisherConfirmSettings;
RabbitMqProducer _producer;
public OutboundRabbitMqTransport(IRabbitMqEndpointAddress address,
- ConnectionHandler<RabbitMqConnection> connectionHandler, bool bindToQueue)
+ ConnectionHandler<RabbitMqConnection> connectionHandler,
+ IPublisherConfirmSettings publisherConfirmSettings,
+ bool bindToQueue)
{
_address = address;
_connectionHandler = connectionHandler;
_bindToQueue = bindToQueue;
+ _publisherConfirmSettings = publisherConfirmSettings;
}
public IEndpointAddress Address
@@ -52,21 +57,24 @@ public void Send(ISendContext context)
properties.SetPersistent(true);
properties.MessageId = context.MessageId ?? properties.MessageId ?? NewId.Next().ToString();
+
if (context.ExpirationTime.HasValue)
{
DateTime value = context.ExpirationTime.Value;
- properties.Expiration =
- (value.Kind == DateTimeKind.Utc
- ? value - SystemUtil.UtcNow
- : value - SystemUtil.Now).
- ToString();
+ properties.Expiration = (value.Kind == DateTimeKind.Utc ? value - SystemUtil.UtcNow : value - SystemUtil.Now).ToString();
+ }
+
+ properties.Headers = new Hashtable { { "Content-Type", context.ContentType } };
+
+ if (context.Headers[PublisherConfirmSettings.ClientMessageId] != null)
+ {
+ properties.Headers.Add(PublisherConfirmSettings.ClientMessageId, context.Headers[PublisherConfirmSettings.ClientMessageId]);
}
using (var body = new MemoryStream())
{
context.SerializeTo(body);
- properties.Headers = new Hashtable {{"Content-Type", context.ContentType}};
-
+
_producer.Publish(_address.Name, properties, body.ToArray());
_address.LogSent(context.MessageId ?? "", context.MessageType);
@@ -97,7 +105,7 @@ void AddProducerBinding()
if (_producer != null)
return;
- _producer = new RabbitMqProducer(_address, _bindToQueue);
+ _producer = new RabbitMqProducer(_address, _publisherConfirmSettings, _bindToQueue);
_connectionHandler.AddBinding(_producer);
}
16 src/Transports/MassTransit.Transports.RabbitMq/PublisherConfirm/IPublisherConfirmSettings.cs
View
@@ -0,0 +1,16 @@
+namespace MassTransit.Transports.RabbitMq.PublisherConfirm
+{
+ using System;
+
+ public interface IPublisherConfirmSettings
+ {
+ bool UsePublisherConfirms { get; set; }
+
+ Action<ulong, string> RegisterMessageAction { get; set; }
+
+ Action<ulong, bool> Acktion { get; set; }
+
+ Action<ulong, bool> Nacktion { get; set; }
+
+ }
+}
18 src/Transports/MassTransit.Transports.RabbitMq/PublisherConfirm/PublisherConfirmSettings.cs
View
@@ -0,0 +1,18 @@
+namespace MassTransit.Transports.RabbitMq.PublisherConfirm
+{
+ using System;
+
+ public class PublisherConfirmSettings : IPublisherConfirmSettings
+ {
+ public const string ClientMessageId = "ClientMessageId";
+
+ public bool UsePublisherConfirms { get; set; }
+
+ public Action<ulong, string> RegisterMessageAction { get; set; }
+
+ public Action<ulong, bool> Acktion { get; set; }
+
+ public Action<ulong, bool> Nacktion { get; set; }
+
+ }
+}
51 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqProducer.cs
View
@@ -12,20 +12,25 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Transports.RabbitMq
{
+ using System;
using Management;
+ using PublisherConfirm;
using RabbitMQ.Client;
+ using RabbitMQ.Client.Events;
- public class RabbitMqProducer :
- ConnectionBinding<RabbitMqConnection>
+ public class RabbitMqProducer : ConnectionBinding<RabbitMqConnection>
{
+ readonly IPublisherConfirmSettings _publisherConfirmSettings;
readonly IRabbitMqEndpointAddress _address;
readonly bool _bindToQueue;
IModel _channel;
+ readonly object _channelLock = new object();
- public RabbitMqProducer(IRabbitMqEndpointAddress address, bool bindToQueue)
+ public RabbitMqProducer(IRabbitMqEndpointAddress address, IPublisherConfirmSettings publisherConfirmSettings, bool bindToQueue)
{
_address = address;
_bindToQueue = bindToQueue;
+ _publisherConfirmSettings = publisherConfirmSettings;
}
public IBasicProperties CreateProperties()
@@ -41,13 +46,33 @@ public void Publish(string exchangeName, IBasicProperties properties, byte[] bod
if (_channel == null)
throw new InvalidConnectionException(_address.Uri, "Channel should not be null");
- _channel.BasicPublish(exchangeName, "", properties, body);
+ if (_publisherConfirmSettings.UsePublisherConfirms)
+ {
+ var clientMessageId = (string)properties.Headers[PublisherConfirmSettings.ClientMessageId];
+
+ if (clientMessageId != null)
+ {
+ _publisherConfirmSettings.RegisterMessageAction(_channel.NextPublishSeqNo, clientMessageId);
+ }
+ }
+
+ lock (_channelLock)
+ {
+ _channel.BasicPublish(exchangeName, "", properties, body);
+ }
}
public void Bind(RabbitMqConnection connection)
{
_channel = connection.Connection.CreateModel();
+ if (_publisherConfirmSettings.UsePublisherConfirms)
+ {
+ _channel.ConfirmSelect();
+ _channel.BasicAcks += OnBasicAcks;
+ _channel.BasicNacks += OnBasicNacks;
+ }
+
_channel.ExchangeDeclare(_address.Name, ExchangeType.Fanout, true);
if (_bindToQueue)
@@ -66,9 +91,27 @@ public void Unbind(RabbitMqConnection connection)
{
if (_channel.IsOpen)
_channel.Close(200, "producer unbind");
+
+ if(_publisherConfirmSettings.UsePublisherConfirms)
+ {
+ _channel.BasicAcks -= OnBasicAcks;
+ _channel.BasicNacks -= OnBasicNacks;
+ }
_channel.Dispose();
_channel = null;
}
}
+
+ private void OnBasicNacks(IModel model, BasicNackEventArgs args)
+ {
+ _publisherConfirmSettings.Acktion(args.DeliveryTag, args.Multiple);
+
+ }
+
+ private void OnBasicAcks(IModel model, BasicAckEventArgs args)
+ {
+
+ _publisherConfirmSettings.Acktion(args.DeliveryTag, args.Multiple);
+ }
}
}
16 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqTransportFactory.cs
View
@@ -20,21 +20,23 @@ namespace MassTransit.Transports.RabbitMq
using Exceptions;
using Magnum.Extensions;
using Magnum.Threading;
+ using PublisherConfirm;
using RabbitMQ.Client;
- public class RabbitMqTransportFactory :
- ITransportFactory
+ public class RabbitMqTransportFactory : ITransportFactory
{
readonly ReaderWriterLockedDictionary<Uri, ConnectionHandler<RabbitMqConnection>> _connectionCache;
readonly IDictionary<Uri, ConnectionFactoryBuilder> _connectionFactoryBuilders;
readonly IMessageNameFormatter _messageNameFormatter;
+ readonly IPublisherConfirmSettings _publisherConfirmSettings;
bool _disposed;
- public RabbitMqTransportFactory(IDictionary<Uri, ConnectionFactoryBuilder> connectionFactoryBuilders)
+ public RabbitMqTransportFactory(IDictionary<Uri, ConnectionFactoryBuilder> connectionFactoryBuilders, IPublisherConfirmSettings publisherConfirmSettings)
{
_connectionCache = new ReaderWriterLockedDictionary<Uri, ConnectionHandler<RabbitMqConnection>>();
_connectionFactoryBuilders = connectionFactoryBuilders;
_messageNameFormatter = new RabbitMqMessageNameFormatter();
+ _publisherConfirmSettings = publisherConfirmSettings;
}
public RabbitMqTransportFactory()
@@ -42,6 +44,7 @@ public RabbitMqTransportFactory()
_connectionCache = new ReaderWriterLockedDictionary<Uri, ConnectionHandler<RabbitMqConnection>>();
_connectionFactoryBuilders = new Dictionary<Uri, ConnectionFactoryBuilder>();
_messageNameFormatter = new RabbitMqMessageNameFormatter();
+ _publisherConfirmSettings = new PublisherConfirmSettings();
}
public void Dispose()
@@ -81,7 +84,6 @@ public IDuplexTransport BuildLoopback(ITransportSettings settings)
public IInboundTransport BuildInbound(ITransportSettings settings)
{
RabbitMqEndpointAddress address = RabbitMqEndpointAddress.Parse(settings.Address.Uri);
-
EnsureProtocolIsCorrect(address.Uri);
ConnectionHandler<RabbitMqConnection> connectionHandler = GetConnection(address);
@@ -93,23 +95,21 @@ public IInboundTransport BuildInbound(ITransportSettings settings)
public IOutboundTransport BuildOutbound(ITransportSettings settings)
{
RabbitMqEndpointAddress address = RabbitMqEndpointAddress.Parse(settings.Address.Uri);
-
EnsureProtocolIsCorrect(address.Uri);
ConnectionHandler<RabbitMqConnection> connectionHandler = GetConnection(address);
- return new OutboundRabbitMqTransport(address, connectionHandler, false);
+ return new OutboundRabbitMqTransport(address, connectionHandler, _publisherConfirmSettings, false);
}
public IOutboundTransport BuildError(ITransportSettings settings)
{
RabbitMqEndpointAddress address = RabbitMqEndpointAddress.Parse(settings.Address.Uri);
-
EnsureProtocolIsCorrect(address.Uri);
ConnectionHandler<RabbitMqConnection> connection = GetConnection(address);
- return new OutboundRabbitMqTransport(address, connection, true);
+ return new OutboundRabbitMqTransport(address, connection, _publisherConfirmSettings, true);
}
public IMessageNameFormatter MessageNameFormatter
5 src/Transports/MassTransit.Transports.RabbitMq/packages.config
View
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="utf-8"?>
+<packages>
+ <package id="Magnum" version="2.1.0" targetFramework="net40" />
+ <package id="Newtonsoft.Json" version="4.5.10" targetFramework="net40" />
+</packages>
1  src/packages/repositories.config
View
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<repositories>
<repository path="..\Persistence\MassTransit.NHibernateIntegration.Tests\packages.config" />
+ <repository path="..\Transports\MassTransit.Transports.RabbitMq\packages.config" />
</repositories>
Something went wrong with that request. Please try again.