From 4af05ec0f8d4095c6ceb039eed485cef61768a10 Mon Sep 17 00:00:00 2001 From: Chris Patterson Date: Thu, 5 May 2016 11:54:16 -0700 Subject: [PATCH] Fixed #551 hang on startup of bus when failed to connect --- .../Activities/ClearRequestActivity.cs | 4 +- .../Activities/PublishActivity.cs | 16 ++++---- .../Activities/RequestActivity.cs | 16 ++++---- .../Activities/RespondActivity.cs | 4 +- .../Activities/ScheduleActivity.cs | 12 +++--- .../Activities/UnscheduleActivity.cs | 8 ++-- .../MessageSessionSagaRepository.cs | 2 +- .../ServiceBusSendTransport.cs | 2 +- .../SendToPublishExchange_Specs.cs | 2 +- .../TwoActivityCourier_Specs.cs | 3 +- .../Contexts/RabbitMqConnectionContext.cs | 6 +-- .../RabbitMqExtensions.cs | 2 +- .../RabbitMqSendTransport.cs | 2 +- .../Diagnostics/PerformanceCounter_Specs.cs | 2 +- src/MassTransit/MassTransit.csproj | 3 -- src/MassTransit/MassTransitBus.cs | 39 +++++++++++++------ src/MassTransit/packages.config | 1 - .../Saga/NHibernateSagaRepository.cs | 4 +- src/SolutionVersion.cs | 4 +- 19 files changed, 72 insertions(+), 60 deletions(-) diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/ClearRequestActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/ClearRequestActivity.cs index 6b5416fca3..ff9adfa4a4 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/ClearRequestActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/ClearRequestActivity.cs @@ -34,11 +34,11 @@ public void Accept(StateMachineVisitor visitor) visitor.Visit(this); } - async Task Activity.Execute(BehaviorContext context, Behavior next) + Task Activity.Execute(BehaviorContext context, Behavior next) { _request.SetRequestId(context.Instance, default(Guid?)); - await next.Execute(context).ConfigureAwait(false); + return next.Execute(context); } public Task Faulted(BehaviorExceptionContext context, Behavior next) diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/PublishActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/PublishActivity.cs index a6e7917213..ae888e173d 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/PublishActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/PublishActivity.cs @@ -60,23 +60,23 @@ async Task Activity.Execute(BehaviorContext context, await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Execute(BehaviorContext context) + Task Execute(BehaviorContext context) { var consumeContext = context.CreateConsumeContext(); TMessage message = _messageFactory(consumeContext); - await consumeContext.Publish(message, _publishPipe).ConfigureAwait(false); + return consumeContext.Publish(message, _publishPipe); } } @@ -121,10 +121,10 @@ void Visitable.Accept(StateMachineVisitor inspector) await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } } } \ No newline at end of file diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/RequestActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/RequestActivity.cs index 79a145a5e4..4f774cca96 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/RequestActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/RequestActivity.cs @@ -50,23 +50,23 @@ async Task Activity.Execute(BehaviorContext context, await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Execute(BehaviorContext context) + Task Execute(BehaviorContext context) { var consumeContext = context.CreateConsumeContext(); TRequest requestMessage = _messageFactory(consumeContext); - await SendRequest(context, consumeContext, requestMessage).ConfigureAwait(false); + return SendRequest(context, consumeContext, requestMessage); } } @@ -103,10 +103,10 @@ public async Task Execute(BehaviorContext context, Behavior(BehaviorExceptionContext context, Behavior next) + public Task Faulted(BehaviorExceptionContext context, Behavior next) where TException : Exception { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } } } \ No newline at end of file diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/RespondActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/RespondActivity.cs index 88284f2cff..fdb584d477 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/RespondActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/RespondActivity.cs @@ -58,10 +58,10 @@ void Visitable.Accept(StateMachineVisitor inspector) await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } } } \ No newline at end of file diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/ScheduleActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/ScheduleActivity.cs index 15ba582c56..691c0984ef 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/ScheduleActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/ScheduleActivity.cs @@ -68,14 +68,14 @@ async Task Activity.Execute(BehaviorContext context, await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } async Task Execute(BehaviorContext context) @@ -161,10 +161,10 @@ void Visitable.Accept(StateMachineVisitor inspector) await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } } } \ No newline at end of file diff --git a/src/MassTransit.AutomatonymousIntegration/Activities/UnscheduleActivity.cs b/src/MassTransit.AutomatonymousIntegration/Activities/UnscheduleActivity.cs index 371001581b..2c41ac9502 100644 --- a/src/MassTransit.AutomatonymousIntegration/Activities/UnscheduleActivity.cs +++ b/src/MassTransit.AutomatonymousIntegration/Activities/UnscheduleActivity.cs @@ -47,14 +47,14 @@ async Task Activity.Execute(BehaviorContext context, await next.Execute(context).ConfigureAwait(false); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } - async Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) + Task Activity.Faulted(BehaviorExceptionContext context, Behavior next) { - await next.Faulted(context).ConfigureAwait(false); + return next.Faulted(context); } async Task Execute(BehaviorContext context) diff --git a/src/MassTransit.AzureServiceBusTransport/MessageSessionSagaRepository.cs b/src/MassTransit.AzureServiceBusTransport/MessageSessionSagaRepository.cs index 8813418d87..ee3b4f9ff8 100644 --- a/src/MassTransit.AzureServiceBusTransport/MessageSessionSagaRepository.cs +++ b/src/MassTransit.AzureServiceBusTransport/MessageSessionSagaRepository.cs @@ -111,7 +111,7 @@ async Task WriteSagaState(MessageSessionContext context, TSaga saga) BsonMessageSerializer.Serializer.Serialize(bsonWriter, saga); bsonWriter.Flush(); - serializeStream.Flush(); + await serializeStream.FlushAsync().ConfigureAwait(false); using (var stateStream = new MemoryStream(serializeStream.ToArray(), false)) { diff --git a/src/MassTransit.AzureServiceBusTransport/ServiceBusSendTransport.cs b/src/MassTransit.AzureServiceBusTransport/ServiceBusSendTransport.cs index cfff080848..5bbb85b1b3 100644 --- a/src/MassTransit.AzureServiceBusTransport/ServiceBusSendTransport.cs +++ b/src/MassTransit.AzureServiceBusTransport/ServiceBusSendTransport.cs @@ -112,7 +112,7 @@ async Task ISendTransport.Send(T message, IPipe> pipe, Cancell } catch (Exception ex) { - _observers.SendFault(context, ex).Wait(cancelSend); + await _observers.SendFault(context, ex).ConfigureAwait(false); throw; } diff --git a/src/MassTransit.RabbitMqTransport.Tests/SendToPublishExchange_Specs.cs b/src/MassTransit.RabbitMqTransport.Tests/SendToPublishExchange_Specs.cs index e703b3ac7f..882a7e4e42 100644 --- a/src/MassTransit.RabbitMqTransport.Tests/SendToPublishExchange_Specs.cs +++ b/src/MassTransit.RabbitMqTransport.Tests/SendToPublishExchange_Specs.cs @@ -24,7 +24,7 @@ public class Sending_to_a_publish_exchange : [Test] public async Task Should_arrive_on_the_receive_endpoint() { - var sendSettings = _host.GetSendSettings(typeof(A)); + var sendSettings = _host.Settings.GetSendSettings(typeof(A)); var sendAddress = _host.Settings.GetSendAddress(sendSettings); var endpoint = await Bus.GetSendEndpoint(sendAddress); diff --git a/src/MassTransit.RabbitMqTransport.Tests/TwoActivityCourier_Specs.cs b/src/MassTransit.RabbitMqTransport.Tests/TwoActivityCourier_Specs.cs index 5775f2b0d0..023021b6f6 100644 --- a/src/MassTransit.RabbitMqTransport.Tests/TwoActivityCourier_Specs.cs +++ b/src/MassTransit.RabbitMqTransport.Tests/TwoActivityCourier_Specs.cs @@ -23,6 +23,7 @@ namespace MassTransit.RabbitMqTransport.Tests using NUnit.Framework; using TestFramework; using TestFramework.Courier; + using Util; [TestFixture] @@ -261,7 +262,7 @@ public void Setup() RoutingSlip routingSlip = builder.Build(); - Bus.Execute(routingSlip); + TaskUtil.Await(() => Bus.Execute(routingSlip)); _sentRoutingSlips.Add(routingSlip.TrackingNumber); } diff --git a/src/MassTransit.RabbitMqTransport/Contexts/RabbitMqConnectionContext.cs b/src/MassTransit.RabbitMqTransport/Contexts/RabbitMqConnectionContext.cs index e672f3ab60..6a47221448 100644 --- a/src/MassTransit.RabbitMqTransport/Contexts/RabbitMqConnectionContext.cs +++ b/src/MassTransit.RabbitMqTransport/Contexts/RabbitMqConnectionContext.cs @@ -51,10 +51,10 @@ public RabbitMqConnectionContext(IConnection connection, RabbitMqHostSettings ho public RabbitMqHostSettings HostSettings => _hostSettings; - public async Task CreateModel() + public Task CreateModel() { - return await Task.Factory.StartNew(() => _connection.CreateModel(), - _participant.StoppedToken, TaskCreationOptions.HideScheduler, _taskScheduler).ConfigureAwait(false); + return Task.Factory.StartNew(() => _connection.CreateModel(), + _participant.StoppedToken, TaskCreationOptions.HideScheduler, _taskScheduler); } public IConnection Connection => _connection; diff --git a/src/MassTransit.RabbitMqTransport/RabbitMqExtensions.cs b/src/MassTransit.RabbitMqTransport/RabbitMqExtensions.cs index 2ff5eec9c7..63ca055283 100644 --- a/src/MassTransit.RabbitMqTransport/RabbitMqExtensions.cs +++ b/src/MassTransit.RabbitMqTransport/RabbitMqExtensions.cs @@ -34,7 +34,7 @@ public static void Cleanup(this IModel model, ushort replyCode = 200, string mes if (model.IsOpen) model.Close(replyCode, message); } - catch (Exception ex) + catch (Exception) { } diff --git a/src/MassTransit.RabbitMqTransport/RabbitMqSendTransport.cs b/src/MassTransit.RabbitMqTransport/RabbitMqSendTransport.cs index 3e442640ef..d7399a101f 100644 --- a/src/MassTransit.RabbitMqTransport/RabbitMqSendTransport.cs +++ b/src/MassTransit.RabbitMqTransport/RabbitMqSendTransport.cs @@ -157,7 +157,7 @@ async Task ISendTransport.Move(ReceiveContext context, IPipe pipe) { using (var bodyStream = context.GetBody()) { - bodyStream.CopyTo(memoryStream); + await bodyStream.CopyToAsync(memoryStream).ConfigureAwait(false); } body = memoryStream.ToArray(); diff --git a/src/MassTransit.Tests/Diagnostics/PerformanceCounter_Specs.cs b/src/MassTransit.Tests/Diagnostics/PerformanceCounter_Specs.cs index d409d730cb..9911fa4084 100644 --- a/src/MassTransit.Tests/Diagnostics/PerformanceCounter_Specs.cs +++ b/src/MassTransit.Tests/Diagnostics/PerformanceCounter_Specs.cs @@ -42,7 +42,7 @@ protected override void ConfigureBus(IInMemoryBusFactoryConfigurator configurato { base.ConfigureBus(configurator); - configurator.EnabledPerformanceCounters(); + configurator.EnablePerformanceCounters(); } protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator) diff --git a/src/MassTransit/MassTransit.csproj b/src/MassTransit/MassTransit.csproj index 91cfba4ec8..fcd966e1ae 100644 --- a/src/MassTransit/MassTransit.csproj +++ b/src/MassTransit/MassTransit.csproj @@ -1132,9 +1132,6 @@ - - - diff --git a/src/MassTransit/MassTransitBus.cs b/src/MassTransit/MassTransitBus.cs index f4f1711746..e6f579d96f 100644 --- a/src/MassTransit/MassTransitBus.cs +++ b/src/MassTransit/MassTransitBus.cs @@ -1,4 +1,4 @@ -// Copyright 2007-2015 Chris Patterson, Dru Sellers, Travis Smith, et. al. +// Copyright 2007-2016 Chris Patterson, Dru Sellers, Travis Smith, et. al. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the @@ -161,9 +161,9 @@ public async Task StartAsync(CancellationToken cancellationToken) if (_log.IsDebugEnabled) _log.DebugFormat("Starting bus hosts..."); - foreach (IBusHostControl host in _hosts) + foreach (var host in _hosts) { - HostHandle hostHandle = host.Start(); + var hostHandle = host.Start(); hosts.Add(hostHandle); } @@ -171,12 +171,12 @@ public async Task StartAsync(CancellationToken cancellationToken) if (_log.IsDebugEnabled) _log.DebugFormat("Starting receive endpoints..."); - foreach (IReceiveEndpoint endpoint in _receiveEndpoints) + foreach (var endpoint in _receiveEndpoints) { - ConnectHandle observerHandle = endpoint.ConnectReceiveObserver(_receiveObservers); + var observerHandle = endpoint.ConnectReceiveObserver(_receiveObservers); observers.Add(observerHandle); - ReceiveEndpointHandle handle = endpoint.Start(); + var handle = endpoint.Start(); endpoints.Add(handle); } @@ -254,16 +254,16 @@ public ConnectHandle ConnectPublishObserver(IPublishObserver observer) void IProbeSite.Probe(ProbeContext context) { - ProbeContext scope = context.CreateScope("bus"); + var scope = context.CreateScope("bus"); scope.Set(new { Address }); - foreach (IBusHostControl host in _hosts) + foreach (var host in _hosts) host.Probe(scope); - foreach (IReceiveEndpoint receiveEndpoint in _receiveEndpoints) + foreach (var receiveEndpoint in _receiveEndpoints) receiveEndpoint.Probe(scope); } @@ -288,14 +288,29 @@ void IDisposable.Dispose() class BusReady { + readonly ReadyObserver[] _observers; + public BusReady(IEnumerable receiveEndpoints) { - var observers = receiveEndpoints.Select(x => new ReadyObserver(x).Ready).ToArray(); + _observers = receiveEndpoints.Select(x => new ReadyObserver(x)).ToArray(); + } - Ready = Task.WhenAll(observers); + public Task Ready + { + get { return ReadyOrNot(_observers.Select(x => x.Ready)); } } - public Task Ready { get; } + async Task ReadyOrNot(IEnumerable> observers) + { + var tasks = observers as Task[] ?? observers.ToArray(); + + foreach (Task observer in tasks) + { + await observer.ConfigureAwait(false); + } + + return await Task.WhenAll(tasks).ConfigureAwait(false); + } class ReadyObserver : diff --git a/src/MassTransit/packages.config b/src/MassTransit/packages.config index ef6fbfaa9b..4d5e7dfab8 100644 --- a/src/MassTransit/packages.config +++ b/src/MassTransit/packages.config @@ -1,6 +1,5 @@  - \ No newline at end of file diff --git a/src/Persistence/MassTransit.NHibernateIntegration/Saga/NHibernateSagaRepository.cs b/src/Persistence/MassTransit.NHibernateIntegration/Saga/NHibernateSagaRepository.cs index a90bbf6a55..044c430cfa 100644 --- a/src/Persistence/MassTransit.NHibernateIntegration/Saga/NHibernateSagaRepository.cs +++ b/src/Persistence/MassTransit.NHibernateIntegration/Saga/NHibernateSagaRepository.cs @@ -48,7 +48,7 @@ public NHibernateSagaRepository(ISessionFactory sessionFactory, System.Data.Isol _insertIsolationLevel = isolationLevel; } - public async Task> Find(ISagaQuery query) + public Task> Find(ISagaQuery query) { using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) using (ISession session = _sessionFactory.OpenSession()) @@ -60,7 +60,7 @@ public async Task> Find(ISagaQuery query) scope.Complete(); - return result; + return Task.FromResult>(result); } } diff --git a/src/SolutionVersion.cs b/src/SolutionVersion.cs index 97d2ad7dc0..7082cc75e5 100644 --- a/src/SolutionVersion.cs +++ b/src/SolutionVersion.cs @@ -5,8 +5,8 @@ [assembly: AssemblyDescriptionAttribute("MassTransit is a distributed application framework for .NET http://masstransit-project.com")] [assembly: AssemblyProductAttribute("MassTransit")] [assembly: AssemblyVersionAttribute("3.3.1.0")] -[assembly: AssemblyFileVersionAttribute("3.3.1.0")] -[assembly: AssemblyInformationalVersionAttribute("3.3.1.0 (develop/71cb3ebe)")] +[assembly: AssemblyFileVersionAttribute("3.3.2.0")] +[assembly: AssemblyInformationalVersionAttribute("3.3.2.0 (develop/2b519cbb)")] namespace System { internal static class AssemblyVersionInformation { internal const string Version = "3.3.1.0";