Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Tuned some startup, added RabbitMQ heartbeat

  • Loading branch information...
commit 40cb1cfc7dbe13a2c343561dfa7bb053768f939f 1 parent 1b91d0c
@phatboyg phatboyg authored
View
2  rakefile.rb
@@ -262,7 +262,7 @@ def copyOutputFiles(fromDir, filePattern, outDir)
nunit :unit_tests => [:compile] do |nunit|
nunit.command = File.join('lib', 'nunit', 'net-2.0', "nunit-console#{(BUILD_PLATFORM.empty? ? '' : "-#{BUILD_PLATFORM}")}.exe")
- nunit.options = "/framework=#{CLR_TOOLS_VERSION}", '/nothread', '/exclude:NotOnTeamCity', '/nologo', '/labels', "\"/xml=#{File.join(props[:artifacts], 'nunit-test-results.xml')}\""
+ nunit.options = "/framework=#{CLR_TOOLS_VERSION}", '/exclude:NotOnTeamCity', '/noshadow', '/nologo', '/labels', "\"/xml=#{File.join(props[:artifacts], 'nunit-test-results.xml')}\""
nunit.assemblies = FileList["tests/MassTransit.Tests.dll", "tests/MassTransit.Containers.Tests.dll"]
end
View
286 src/MassTransit/Configuration/Builders/ServiceBusBuilderImpl.cs
@@ -12,147 +12,147 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Builders
{
- using System;
- using System.Collections.Generic;
- using BusServiceConfigurators;
- using Configuration;
- using Exceptions;
- using Magnum;
- using Magnum.Extensions;
- using Pipeline.Configuration;
- using Util;
-
- public class ServiceBusBuilderImpl :
- ServiceBusBuilder
- {
- readonly IList<BusServiceConfigurator> _busServiceConfigurators;
- readonly IList<Action<ServiceBus>> _postCreateActions;
- readonly BusSettings _settings;
-
- public ServiceBusBuilderImpl(BusSettings settings)
- {
- Guard.AgainstNull(settings, "settings");
-
- Guard.AgainstNull(settings.EndpointCache, "endpointCache");
-
- _settings = settings;
-
- _postCreateActions = new List<Action<ServiceBus>>();
- _busServiceConfigurators = new List<BusServiceConfigurator>();
- }
-
- public BusSettings Settings
- {
- get { return _settings; }
- }
-
- public IControlBus Build()
- {
- ServiceBus bus = CreateServiceBus(_settings.EndpointCache);
-
- ConfigureBusSettings(bus);
-
- ConfigureMessageInterceptors(bus);
-
- RunPostCreateActions(bus);
-
- RunBusServiceConfigurators(bus);
-
- if (_settings.AutoStart)
- {
- bus.Start();
- }
-
- return bus;
- }
-
- public void UseControlBus(IControlBus controlBus)
- {
- _postCreateActions.Add(bus => bus.ControlBus = controlBus);
- }
-
- public void AddPostCreateAction(Action<ServiceBus> postCreateAction)
- {
- _postCreateActions.Add(postCreateAction);
- }
-
- public void AddBusServiceConfigurator(BusServiceConfigurator configurator)
- {
- _busServiceConfigurators.Add(configurator);
- }
-
- public void Match<T>([NotNull] Action<T> callback)
- where T : class, BusBuilder
- {
- Guard.AgainstNull(callback);
-
- if (typeof (T).IsAssignableFrom(GetType()))
- callback(this as T);
- }
-
- void RunBusServiceConfigurators(ServiceBus bus)
- {
- foreach (BusServiceConfigurator busServiceConfigurator in _busServiceConfigurators)
- {
- try
- {
- IBusService busService = busServiceConfigurator.Create(bus);
-
- bus.AddService(busServiceConfigurator.Layer, busService);
- }
- catch (Exception ex)
- {
- throw new ConfigurationException("Failed to create the bus service: " +
- busServiceConfigurator.ServiceType.ToShortTypeName(), ex);
- }
- }
- }
-
- void RunPostCreateActions(ServiceBus bus)
- {
- foreach (var postCreateAction in _postCreateActions)
- {
- try
- {
- postCreateAction(bus);
- }
- catch (Exception ex)
- {
- throw new ConfigurationException("An exception was thrown while running post-creation actions", ex);
- }
- }
- }
-
- ServiceBus CreateServiceBus(IEndpointCache endpointCache)
- {
- IEndpoint endpoint = endpointCache.GetEndpoint(_settings.InputAddress);
-
- var serviceBus = new ServiceBus(endpoint, endpointCache);
-
- return serviceBus;
- }
-
- void ConfigureBusSettings(ServiceBus bus)
- {
- if (_settings.ConcurrentConsumerLimit > 0)
- bus.MaximumConsumerThreads = _settings.ConcurrentConsumerLimit;
-
- if (_settings.ConcurrentReceiverLimit > 0)
- bus.ConcurrentReceiveThreads = _settings.ConcurrentReceiverLimit;
-
- bus.ReceiveTimeout = _settings.ReceiveTimeout;
- }
-
- void ConfigureMessageInterceptors(IServiceBus bus)
- {
- if (_settings.BeforeConsume != null || _settings.AfterConsume != null)
- {
- var configurator = new InboundMessageInterceptorConfigurator(bus.InboundPipeline);
-
- var interceptor = new DelegateMessageInterceptor(_settings.BeforeConsume, _settings.AfterConsume);
-
- configurator.Create(interceptor);
- }
- }
- }
+ using System;
+ using System.Collections.Generic;
+ using BusServiceConfigurators;
+ using Configuration;
+ using Exceptions;
+ using Magnum;
+ using Magnum.Extensions;
+ using Pipeline.Configuration;
+ using Util;
+
+ public class ServiceBusBuilderImpl :
+ ServiceBusBuilder
+ {
+ readonly IList<BusServiceConfigurator> _busServiceConfigurators;
+ readonly IList<Action<ServiceBus>> _postCreateActions;
+ readonly BusSettings _settings;
+
+ public ServiceBusBuilderImpl(BusSettings settings)
+ {
+ Guard.AgainstNull(settings, "settings");
+
+ Guard.AgainstNull(settings.EndpointCache, "endpointCache");
+
+ _settings = settings;
+
+ _postCreateActions = new List<Action<ServiceBus>>();
+ _busServiceConfigurators = new List<BusServiceConfigurator>();
+ }
+
+ public BusSettings Settings
+ {
+ get { return _settings; }
+ }
+
+ public IControlBus Build()
+ {
+ ServiceBus bus = CreateServiceBus(_settings.EndpointCache);
+
+ ConfigureBusSettings(bus);
+
+ RunPostCreateActions(bus);
+
+ ConfigureMessageInterceptors(bus);
+
+ RunBusServiceConfigurators(bus);
+
+ if (_settings.AutoStart)
+ {
+ bus.Start();
+ }
+
+ return bus;
+ }
+
+ public void UseControlBus(IControlBus controlBus)
+ {
+ _postCreateActions.Add(bus => bus.ControlBus = controlBus);
+ }
+
+ public void AddPostCreateAction(Action<ServiceBus> postCreateAction)
+ {
+ _postCreateActions.Add(postCreateAction);
+ }
+
+ public void AddBusServiceConfigurator(BusServiceConfigurator configurator)
+ {
+ _busServiceConfigurators.Add(configurator);
+ }
+
+ public void Match<T>([NotNull] Action<T> callback)
+ where T : class, BusBuilder
+ {
+ Guard.AgainstNull(callback);
+
+ if (typeof (T).IsAssignableFrom(GetType()))
+ callback(this as T);
+ }
+
+ void RunBusServiceConfigurators(ServiceBus bus)
+ {
+ foreach (BusServiceConfigurator busServiceConfigurator in _busServiceConfigurators)
+ {
+ try
+ {
+ IBusService busService = busServiceConfigurator.Create(bus);
+
+ bus.AddService(busServiceConfigurator.Layer, busService);
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException("Failed to create the bus service: " +
+ busServiceConfigurator.ServiceType.ToShortTypeName(), ex);
+ }
+ }
+ }
+
+ void RunPostCreateActions(ServiceBus bus)
+ {
+ foreach (var postCreateAction in _postCreateActions)
+ {
+ try
+ {
+ postCreateAction(bus);
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException("An exception was thrown while running post-creation actions", ex);
+ }
+ }
+ }
+
+ ServiceBus CreateServiceBus(IEndpointCache endpointCache)
+ {
+ IEndpoint endpoint = endpointCache.GetEndpoint(_settings.InputAddress);
+
+ var serviceBus = new ServiceBus(endpoint, endpointCache);
+
+ return serviceBus;
+ }
+
+ void ConfigureBusSettings(ServiceBus bus)
+ {
+ if (_settings.ConcurrentConsumerLimit > 0)
+ bus.MaximumConsumerThreads = _settings.ConcurrentConsumerLimit;
+
+ if (_settings.ConcurrentReceiverLimit > 0)
+ bus.ConcurrentReceiveThreads = _settings.ConcurrentReceiverLimit;
+
+ bus.ReceiveTimeout = _settings.ReceiveTimeout;
+ }
+
+ void ConfigureMessageInterceptors(IServiceBus bus)
+ {
+ if (_settings.BeforeConsume != null || _settings.AfterConsume != null)
+ {
+ var configurator = new InboundMessageInterceptorConfigurator(bus.InboundPipeline);
+
+ var interceptor = new DelegateMessageInterceptor(_settings.BeforeConsume, _settings.AfterConsume);
+
+ configurator.Create(interceptor);
+ }
+ }
+ }
}
View
143 src/MassTransit/Configuration/BusConfigurators/ControlBusConfiguratorImpl.cs
@@ -1,82 +1,85 @@
-// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+// Copyright 2007-2012 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
-// Unless required by applicable law or agreed to in writing, software distributed
+// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.BusConfigurators
{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using Builders;
- using Configurators;
- using Logging;
- using Util;
-
- public class ControlBusConfiguratorImpl :
- ControlBusConfigurator,
- BusBuilderConfigurator
- {
- static readonly ILog _log = Logger.Get(typeof (ControlBusConfiguratorImpl));
-
- readonly IList<BusBuilderConfigurator> _configurators;
- Uri _uri;
-
- public ControlBusConfiguratorImpl()
- {
- _configurators = new List<BusBuilderConfigurator>();
- }
-
- public BusBuilder Configure(BusBuilder builder)
- {
- if (builder == null) throw new ArgumentNullException("builder");
- builder.Match<ServiceBusBuilder>(x =>
- {
- var settings = new ServiceBusSettings(builder.Settings);
-
- settings.InputAddress = _uri ?? builder.Settings.InputAddress.AppendToPath("_control");
-
- // the endpoint factory is already created, so we can't set the endpoint here
- // we really need this to be part of another step, but i don't have a clue how yet.
- //_configurator.ConfigureEndpoint(_uri, x => x.PurgeExistingMessages());
-
- if (_log.IsDebugEnabled)
- _log.DebugFormat("Configuring control bus for {0} at {1}", builder.Settings.InputAddress, settings.InputAddress);
-
- settings.ConcurrentConsumerLimit = 1;
- settings.ConcurrentReceiverLimit = 1;
- settings.AutoStart = true;
-
- BusBuilder controlBusBuilder = new ControlBusBuilderImpl(settings);
-
- controlBusBuilder = _configurators
- .Aggregate(controlBusBuilder, (current, configurator) => configurator.Configure(current));
-
- IControlBus controlBus = controlBusBuilder.Build();
-
- x.UseControlBus(controlBus);
- });
-
- return builder;
- }
-
- public IEnumerable<ValidationResult> Validate()
- {
- return from configurator in _configurators
- from result in configurator.Validate()
- select result.WithParentKey("ControlBus");
- }
-
- public void ReceiveFrom(Uri uri)
- {
- _uri = uri;
- }
- }
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Builders;
+ using Configurators;
+ using Logging;
+ using Util;
+
+ public class ControlBusConfiguratorImpl :
+ ControlBusConfigurator,
+ BusBuilderConfigurator
+ {
+ static readonly ILog _log = Logger.Get(typeof(ControlBusConfiguratorImpl));
+
+ readonly IList<BusBuilderConfigurator> _configurators;
+ Uri _uri;
+
+ public ControlBusConfiguratorImpl()
+ {
+ _configurators = new List<BusBuilderConfigurator>();
+ }
+
+ public BusBuilder Configure(BusBuilder builder)
+ {
+ if (builder == null)
+ throw new ArgumentNullException("builder");
+
+ builder.Match<ServiceBusBuilder>(x =>
+ {
+ var settings = new ServiceBusSettings(builder.Settings);
+
+ settings.InputAddress = _uri ?? builder.Settings.InputAddress.AppendToPath("_control");
+
+ // the endpoint factory is already created, so we can't set the endpoint here
+ // we really need this to be part of another step, but i don't have a clue how yet.
+ //_configurator.ConfigureEndpoint(_uri, x => x.PurgeExistingMessages());
+
+ if (_log.IsDebugEnabled)
+ _log.DebugFormat("Configuring control bus for {0} at {1}", builder.Settings.InputAddress,
+ settings.InputAddress);
+
+ settings.ConcurrentConsumerLimit = 1;
+ settings.ConcurrentReceiverLimit = 1;
+ settings.AutoStart = true;
+
+ BusBuilder controlBusBuilder = new ControlBusBuilderImpl(settings);
+
+ controlBusBuilder = _configurators
+ .Aggregate(controlBusBuilder, (current, configurator) => configurator.Configure(current));
+
+ IControlBus controlBus = controlBusBuilder.Build();
+
+ x.UseControlBus(controlBus);
+ });
+
+ return builder;
+ }
+
+ public IEnumerable<ValidationResult> Validate()
+ {
+ return from configurator in _configurators
+ from result in configurator.Validate()
+ select result.WithParentKey("ControlBus");
+ }
+
+ public void ReceiveFrom(Uri uri)
+ {
+ _uri = uri;
+ }
+ }
}
View
34 src/MassTransit/Configuration/BusConfigurators/ServiceBusConfiguratorImpl.cs
@@ -27,10 +27,10 @@ namespace MassTransit.BusConfigurators
using SubscriptionConfigurators;
using Transports;
- /// <summary>
- /// <see cref="ServiceBusConfigurator"/>. Core implementation of service bus
- /// configurator.
- /// </summary>
+ /// <summary>
+ /// <see cref="ServiceBusConfigurator"/>. Core implementation of service bus
+ /// configurator.
+ /// </summary>
public class ServiceBusConfiguratorImpl :
ServiceBusConfigurator
{
@@ -156,8 +156,8 @@ public IServiceBus CreateServiceBus()
_subscriptionRouterConfigurator.SetNetwork(_settings.Network);
- // run through all configurators that have been set and let
- // them do their magic
+ // run through all configurators that have been set and let
+ // them do their magic
foreach (BusBuilderConfigurator configurator in _configurators)
{
builder = configurator.Configure(builder);
@@ -168,19 +168,19 @@ public IServiceBus CreateServiceBus()
return bus;
}
- /// <summary>
- /// This lets you change the bus settings without
- /// having to implement a <see cref="BusBuilderConfigurator"/>
- /// first. Use with caution.
- /// </summary>
- /// <param name="callback">The callback that changes the settings.</param>
+ /// <summary>
+ /// This lets you change the bus settings without
+ /// having to implement a <see cref="BusBuilderConfigurator"/>
+ /// first. Use with caution.
+ /// </summary>
+ /// <param name="callback">The callback that changes the settings.</param>
public void ChangeSettings([NotNull] Action<ServiceBusSettings> callback)
- {
- if (callback == null) throw new ArgumentNullException("callback");
- callback(_settings);
- }
+ {
+ if (callback == null) throw new ArgumentNullException("callback");
+ callback(_settings);
+ }
- IEndpointCache CreateEndpointCache()
+ IEndpointCache CreateEndpointCache()
{
if (_settings.EndpointCache != null)
return _settings.EndpointCache;
View
2  src/MassTransit/Diagnostics/Introspection/IntrospectionServiceConfigurator.cs
@@ -25,7 +25,7 @@ public Type ServiceType
public BusServiceLayer Layer
{
- get { return BusServiceLayer.Application; }
+ get { return BusServiceLayer.Presentation; }
}
public IBusService Create(IServiceBus bus)
View
2  src/MassTransit/Diagnostics/TracingExtensions.cs
@@ -23,7 +23,7 @@ public static void EnableMessageTracing(this ServiceBusConfigurator configurator
{
var service = new MessageTraceBusService(bus.EventChannel);
- bus.AddService(BusServiceLayer.Network, service);
+ bus.AddService(BusServiceLayer.Presentation, service);
});
configurator.AddBusConfigurator(busConfigurator);
View
2  src/MassTransit/Services/Routing/Configuration/RoutingConfigurator.cs
@@ -30,7 +30,7 @@ public Type ServiceType
public BusServiceLayer Layer
{
- get { return BusServiceLayer.Presentation; }
+ get { return BusServiceLayer.Session; }
}
public IBusService Create(IServiceBus bus)
View
18 src/Transports/MassTransit.Transports.RabbitMq/IRabbitMqEndpointAddress.cs
@@ -16,14 +16,14 @@ namespace MassTransit.Transports.RabbitMq
using System.Collections;
using RabbitMQ.Client;
- public interface IRabbitMqEndpointAddress :
- IEndpointAddress
- {
- ConnectionFactory ConnectionFactory { get; }
- string Name { get; }
+ public interface IRabbitMqEndpointAddress :
+ IEndpointAddress
+ {
+ ConnectionFactory ConnectionFactory { get; }
+ string Name { get; }
- IRabbitMqEndpointAddress ForQueue(string name);
- IDictionary QueueArguments();
- void SetTtl(TimeSpan ttl);
- }
+ IRabbitMqEndpointAddress ForQueue(string name);
+ IDictionary QueueArguments();
+ void SetTtl(TimeSpan ttl);
+ }
}
View
6 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqEndpointAddress.cs
@@ -27,6 +27,8 @@ public class RabbitMqEndpointAddress :
const string FormatErrorMsg =
"The path can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.";
+ const int DefaultHeartbeat = 3;
+
static readonly string LocalMachineName = Environment.MachineName.ToLowerInvariant();
static readonly Regex _regex = new Regex(@"^[A-Za-z0-9\-_\.:]+$");
@@ -39,7 +41,6 @@ public class RabbitMqEndpointAddress :
Func<bool> _isLocal;
int _ttl;
-
public RabbitMqEndpointAddress(Uri uri, ConnectionFactory connectionFactory, string name)
{
_uri = new Uri(uri.GetLeftPart(UriPartial.Path));
@@ -160,6 +161,9 @@ public static RabbitMqEndpointAddress Parse(Uri address)
name = pathSegments[1];
}
+ var heartbeat = address.Query.GetValueFromQueryString("heartbeat", (ushort)DefaultHeartbeat);
+ connectionFactory.RequestedHeartbeat = heartbeat;
+
VerifyQueueOrExchangeNameIsLegal(name);
return new RabbitMqEndpointAddress(address, connectionFactory, name);
View
26 src/Transports/MassTransit.Transports.RabbitMq/RabbitMqTransportFactory.cs
@@ -50,19 +50,19 @@ public void Dispose()
GC.SuppressFinalize(this);
}
- void Dispose(bool disposing)
- {
- if (_disposed) return;
- if (disposing)
- {
- _connectionCache.Values.Each(x => x.Dispose());
- _connectionCache.Clear();
-
- _connectionCache.Dispose();
- }
-
- _disposed = true;
- }
+ void Dispose(bool disposing)
+ {
+ if (_disposed) return;
+ if (disposing)
+ {
+ _connectionCache.Values.Each(x => x.Dispose());
+ _connectionCache.Clear();
+
+ _connectionCache.Dispose();
+ }
+
+ _disposed = true;
+ }
public string Scheme
{
Please sign in to comment.
Something went wrong with that request. Please try again.