Skip to content

Commit

Permalink
Fixed #551 hang on startup of bus when failed to connect
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed May 5, 2016
1 parent 2b519cb commit 4af05ec
Show file tree
Hide file tree
Showing 19 changed files with 72 additions and 60 deletions.
Expand Up @@ -34,11 +34,11 @@ public void Accept(StateMachineVisitor visitor)
visitor.Visit(this);
}

async Task Activity<TInstance, TData>.Execute(BehaviorContext<TInstance, TData> context, Behavior<TInstance, TData> next)
Task Activity<TInstance, TData>.Execute(BehaviorContext<TInstance, TData> context, Behavior<TInstance, TData> next)
{
_request.SetRequestId(context.Instance, default(Guid?));

await next.Execute(context).ConfigureAwait(false);
return next.Execute(context);
}

public Task Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context, Behavior<TInstance, TData> next)
Expand Down
Expand Up @@ -60,23 +60,23 @@ async Task Activity<TInstance>.Execute<T>(BehaviorContext<TInstance, T> context,
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Execute(BehaviorContext<TInstance> context)
Task Execute(BehaviorContext<TInstance> context)
{
var consumeContext = context.CreateConsumeContext();

TMessage message = _messageFactory(consumeContext);

await consumeContext.Publish(message, _publishPipe).ConfigureAwait(false);
return consumeContext.Publish(message, _publishPipe);
}
}

Expand Down Expand Up @@ -121,10 +121,10 @@ void Visitable.Accept(StateMachineVisitor inspector)
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Behavior<TInstance, TData> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}
}
}
Expand Up @@ -50,23 +50,23 @@ async Task Activity<TInstance>.Execute<T>(BehaviorContext<TInstance, T> context,
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Execute(BehaviorContext<TInstance> context)
Task Execute(BehaviorContext<TInstance> context)
{
var consumeContext = context.CreateConsumeContext();

TRequest requestMessage = _messageFactory(consumeContext);

await SendRequest(context, consumeContext, requestMessage).ConfigureAwait(false);
return SendRequest(context, consumeContext, requestMessage);
}
}

Expand Down Expand Up @@ -103,10 +103,10 @@ public async Task Execute(BehaviorContext<TInstance, TData> context, Behavior<TI
await next.Execute(context).ConfigureAwait(false);
}

public async Task Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context, Behavior<TInstance, TData> next)
public Task Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context, Behavior<TInstance, TData> next)
where TException : Exception
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}
}
}
Expand Up @@ -58,10 +58,10 @@ void Visitable.Accept(StateMachineVisitor inspector)
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Behavior<TInstance, TData> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}
}
}
Expand Up @@ -68,14 +68,14 @@ async Task Activity<TInstance>.Execute<T>(BehaviorContext<TInstance, T> context,
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Execute(BehaviorContext<TInstance> context)
Expand Down Expand Up @@ -161,10 +161,10 @@ void Visitable.Accept(StateMachineVisitor inspector)
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Task Activity<TInstance, TData>.Faulted<TException>(BehaviorExceptionContext<TInstance, TData, TException> context,
Behavior<TInstance, TData> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}
}
}
Expand Up @@ -47,14 +47,14 @@ async Task Activity<TInstance>.Execute<T>(BehaviorContext<TInstance, T> context,
await next.Execute(context).ConfigureAwait(false);
}

async Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
Task Activity<TInstance>.Faulted<TException>(BehaviorExceptionContext<TInstance, TException> context, Behavior<TInstance> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
Task Activity<TInstance>.Faulted<T, TException>(BehaviorExceptionContext<TInstance, T, TException> context, Behavior<TInstance, T> next)
{
await next.Faulted(context).ConfigureAwait(false);
return next.Faulted(context);
}

async Task Execute(BehaviorContext<TInstance> context)
Expand Down
Expand Up @@ -111,7 +111,7 @@ async Task WriteSagaState(MessageSessionContext context, TSaga saga)
BsonMessageSerializer.Serializer.Serialize(bsonWriter, saga);

bsonWriter.Flush();
serializeStream.Flush();
await serializeStream.FlushAsync().ConfigureAwait(false);

using (var stateStream = new MemoryStream(serializeStream.ToArray(), false))
{
Expand Down
Expand Up @@ -112,7 +112,7 @@ async Task ISendTransport.Send<T>(T message, IPipe<SendContext<T>> pipe, Cancell
}
catch (Exception ex)
{
_observers.SendFault(context, ex).Wait(cancelSend);
await _observers.SendFault(context, ex).ConfigureAwait(false);

throw;
}
Expand Down
Expand Up @@ -24,7 +24,7 @@ public class Sending_to_a_publish_exchange :
[Test]
public async Task Should_arrive_on_the_receive_endpoint()
{
var sendSettings = _host.GetSendSettings(typeof(A));
var sendSettings = _host.Settings.GetSendSettings(typeof(A));
var sendAddress = _host.Settings.GetSendAddress(sendSettings);

var endpoint = await Bus.GetSendEndpoint(sendAddress);
Expand Down
Expand Up @@ -23,6 +23,7 @@ namespace MassTransit.RabbitMqTransport.Tests
using NUnit.Framework;
using TestFramework;
using TestFramework.Courier;
using Util;


[TestFixture]
Expand Down Expand Up @@ -261,7 +262,7 @@ public void Setup()

RoutingSlip routingSlip = builder.Build();

Bus.Execute(routingSlip);
TaskUtil.Await(() => Bus.Execute(routingSlip));

_sentRoutingSlips.Add(routingSlip.TrackingNumber);
}
Expand Down
Expand Up @@ -51,10 +51,10 @@ public RabbitMqConnectionContext(IConnection connection, RabbitMqHostSettings ho

public RabbitMqHostSettings HostSettings => _hostSettings;

public async Task<IModel> CreateModel()
public Task<IModel> CreateModel()
{
return await Task.Factory.StartNew(() => _connection.CreateModel(),
_participant.StoppedToken, TaskCreationOptions.HideScheduler, _taskScheduler).ConfigureAwait(false);
return Task.Factory.StartNew(() => _connection.CreateModel(),
_participant.StoppedToken, TaskCreationOptions.HideScheduler, _taskScheduler);
}

public IConnection Connection => _connection;
Expand Down
2 changes: 1 addition & 1 deletion src/MassTransit.RabbitMqTransport/RabbitMqExtensions.cs
Expand Up @@ -34,7 +34,7 @@ public static void Cleanup(this IModel model, ushort replyCode = 200, string mes
if (model.IsOpen)
model.Close(replyCode, message);
}
catch (Exception ex)
catch (Exception)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/MassTransit.RabbitMqTransport/RabbitMqSendTransport.cs
Expand Up @@ -157,7 +157,7 @@ async Task ISendTransport.Move(ReceiveContext context, IPipe<SendContext> pipe)
{
using (var bodyStream = context.GetBody())
{
bodyStream.CopyTo(memoryStream);
await bodyStream.CopyToAsync(memoryStream).ConfigureAwait(false);
}
body = memoryStream.ToArray();
Expand Down
Expand Up @@ -42,7 +42,7 @@ protected override void ConfigureBus(IInMemoryBusFactoryConfigurator configurato
{
base.ConfigureBus(configurator);

configurator.EnabledPerformanceCounters();
configurator.EnablePerformanceCounters();
}

protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
Expand Down
3 changes: 0 additions & 3 deletions src/MassTransit/MassTransit.csproj
Expand Up @@ -1132,9 +1132,6 @@
<ItemGroup>
<Folder Include="Configuration\Configuration\" />
</ItemGroup>
<ItemGroup>
<Analyzer Include="..\packages\AsyncFixer.1.1.3\analyzers\dotnet\cs\AsyncFixer.dll" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
Expand Down
39 changes: 27 additions & 12 deletions src/MassTransit/MassTransitBus.cs
@@ -1,4 +1,4 @@
// Copyright 2007-2015 Chris Patterson, Dru Sellers, Travis Smith, et. al.
// Copyright 2007-2016 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
Expand Down Expand Up @@ -161,22 +161,22 @@ public async Task<BusHandle> StartAsync(CancellationToken cancellationToken)
if (_log.IsDebugEnabled)
_log.DebugFormat("Starting bus hosts...");

foreach (IBusHostControl host in _hosts)
foreach (var host in _hosts)
{
HostHandle hostHandle = host.Start();
var hostHandle = host.Start();

hosts.Add(hostHandle);
}

if (_log.IsDebugEnabled)
_log.DebugFormat("Starting receive endpoints...");

foreach (IReceiveEndpoint endpoint in _receiveEndpoints)
foreach (var endpoint in _receiveEndpoints)
{
ConnectHandle observerHandle = endpoint.ConnectReceiveObserver(_receiveObservers);
var observerHandle = endpoint.ConnectReceiveObserver(_receiveObservers);
observers.Add(observerHandle);

ReceiveEndpointHandle handle = endpoint.Start();
var handle = endpoint.Start();

endpoints.Add(handle);
}
Expand Down Expand Up @@ -254,16 +254,16 @@ public ConnectHandle ConnectPublishObserver(IPublishObserver observer)

void IProbeSite.Probe(ProbeContext context)
{
ProbeContext scope = context.CreateScope("bus");
var scope = context.CreateScope("bus");
scope.Set(new
{
Address
});

foreach (IBusHostControl host in _hosts)
foreach (var host in _hosts)
host.Probe(scope);

foreach (IReceiveEndpoint receiveEndpoint in _receiveEndpoints)
foreach (var receiveEndpoint in _receiveEndpoints)
receiveEndpoint.Probe(scope);
}

Expand All @@ -288,14 +288,29 @@ void IDisposable.Dispose()

class BusReady
{
readonly ReadyObserver[] _observers;

public BusReady(IEnumerable<IReceiveEndpoint> receiveEndpoints)
{
var observers = receiveEndpoints.Select(x => new ReadyObserver(x).Ready).ToArray();
_observers = receiveEndpoints.Select(x => new ReadyObserver(x)).ToArray();
}

Ready = Task.WhenAll(observers);
public Task<ReceiveEndpointReady[]> Ready
{
get { return ReadyOrNot(_observers.Select(x => x.Ready)); }
}

public Task<ReceiveEndpointReady[]> Ready { get; }
async Task<ReceiveEndpointReady[]> ReadyOrNot(IEnumerable<Task<ReceiveEndpointReady>> observers)
{
var tasks = observers as Task<ReceiveEndpointReady>[] ?? observers.ToArray();

foreach (Task<ReceiveEndpointReady> observer in tasks)
{
await observer.ConfigureAwait(false);
}

return await Task.WhenAll(tasks).ConfigureAwait(false);
}


class ReadyObserver :
Expand Down
1 change: 0 additions & 1 deletion src/MassTransit/packages.config
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="AsyncFixer" version="1.1.3" targetFramework="net452" />
<package id="NewId" version="2.1.3" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
</packages>
Expand Up @@ -48,7 +48,7 @@ public NHibernateSagaRepository(ISessionFactory sessionFactory, System.Data.Isol
_insertIsolationLevel = isolationLevel;
}

public async Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query)
public Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query)
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
using (ISession session = _sessionFactory.OpenSession())
Expand All @@ -60,7 +60,7 @@ public async Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query)

scope.Complete();

return result;
return Task.FromResult<IEnumerable<Guid>>(result);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/SolutionVersion.cs
Expand Up @@ -5,8 +5,8 @@
[assembly: AssemblyDescriptionAttribute("MassTransit is a distributed application framework for .NET http://masstransit-project.com")]
[assembly: AssemblyProductAttribute("MassTransit")]
[assembly: AssemblyVersionAttribute("3.3.1.0")]
[assembly: AssemblyFileVersionAttribute("3.3.1.0")]
[assembly: AssemblyInformationalVersionAttribute("3.3.1.0 (develop/71cb3ebe)")]
[assembly: AssemblyFileVersionAttribute("3.3.2.0")]
[assembly: AssemblyInformationalVersionAttribute("3.3.2.0 (develop/2b519cbb)")]
namespace System {
internal static class AssemblyVersionInformation {
internal const string Version = "3.3.1.0";
Expand Down

0 comments on commit 4af05ec

Please sign in to comment.