Skip to content

Commit

Permalink
Merge pull request #8 from emgdev/master
Browse files Browse the repository at this point in the history
Small adjustments
  • Loading branch information
Kralizek committed Oct 8, 2015
2 parents 746492a + 2270d61 commit 30afbe0
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/CastleWindsor/CastleWindsor.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<description>Castle Windsor support for Nybus</description>
<language>en-US</language>
<dependencies>
<dependency id="Castle.Core" version="3.3.3" />
<dependency id="Castle.Core" version="3.3.0" />
<dependency id="Castle.Windsor" version="3.3.0" />
</dependencies>
</metadata>
Expand Down
24 changes: 20 additions & 4 deletions src/Core/Configuration/NybusBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ public void SubscribeToEvent<TEvent>(Func<EventContext<TEvent>, Task> handler)
using (var scope = _options.Container.BeginScope())
{
var handler = scope.Resolve<TEventHandler>();
await HandleEventMessage(handler, message).ConfigureAwait(false);
scope.Release(handler);

if (handler != null)
{
await HandleEventMessage(handler, message).ConfigureAwait(false);
scope.Release(handler);
}
else
{
_logger.LogError(new { message = "Handler not found", eventType = typeof(TEvent).FullName, handlerType = typeof(TEventHandler).FullName, correlationId = message.CorrelationId, containerType = _options.Container.GetType().FullName });
}
}
}

Expand Down Expand Up @@ -111,8 +119,16 @@ public void SubscribeToCommand<TCommand>(Func<CommandContext<TCommand>, Task> ha
using (var scope = _options.Container.BeginScope())
{
var handler = scope.Resolve<TCommandHandler>();
await HandleCommandMessage(handler, message).ConfigureAwait(false);
scope.Release(handler);

if (handler != null)
{
await HandleCommandMessage(handler, message).ConfigureAwait(false);
scope.Release(handler);
}
else
{
_logger.LogError(new {message = "Handler not found", commandType = typeof(TCommand).FullName, handlerType = typeof(TCommandHandler).FullName, correlationId = message.CorrelationId, containerType = _options.Container.GetType().FullName });
}
}
}

Expand Down
27 changes: 27 additions & 0 deletions src/Interfaces/BusExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ public static async Task InvokeManyCommands<TCommand>(this IBus bus, IEnumerable
await Task.WhenAll(commands.Select(bus.InvokeCommand)).ConfigureAwait(false);
}

public static async Task InvokeManyCommands<TCommand>(this IBus bus, IEnumerable<TCommand> commands, Guid correlationId)
where TCommand : class, ICommand
{
if (bus == null)
{
throw new ArgumentNullException(nameof(bus));
}

if (commands == null) return;

await Task.WhenAll(commands.Select(c => bus.InvokeCommand(c, correlationId))).ConfigureAwait(false);
}

public static async Task RaiseManyEvents<TEvent>(this IBus bus, IEnumerable<TEvent> events)
where TEvent : class, IEvent
{
Expand All @@ -33,5 +46,19 @@ public static async Task RaiseManyEvents<TEvent>(this IBus bus, IEnumerable<TEve

await Task.WhenAll(events.Select(bus.RaiseEvent)).ConfigureAwait(false);
}

public static async Task RaiseManyEvents<TEvent>(this IBus bus, IEnumerable<TEvent> events, Guid correlationId)
where TEvent : class, IEvent
{
if (bus == null)
{
throw new ArgumentNullException(nameof(bus));
}

if (events == null) return;

await Task.WhenAll(events.Select(e => bus.RaiseEvent(e, correlationId))).ConfigureAwait(false);
}

}
}
4 changes: 2 additions & 2 deletions src/MassTransit/MassTransit.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
<description>Nybus bus based on MassTransit</description>
<language>en-US</language>
<dependencies>
<dependency id="MassTransit" version="[2,3)" />
<dependency id="MassTransit.RabbitMQ" version="[2,3)" />
<dependency id="MassTransit" version="[2.10,3)" />
<dependency id="MassTransit.RabbitMQ" version="[2.10,3)" />
</dependencies>
</metadata>
</package>
33 changes: 33 additions & 0 deletions tests/Tests.Core/Configuration/NybusBusBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ public async Task SubscribeToEvent_TEvent_is_invoked_when_message_is_received()
mockEventHandler.Verify(p => p.Handle(context), Times.Once);
}

[Test]
public async Task No_exception_is_thrown_when_no_handler_for_TEvent_is_found()
{
var sut = CreateSystemUnderTest();

sut.SubscribeToEvent<TestEvent>();

var message = fixture.Create<EventMessage<TestEvent>>();

mockScope.Setup(p => p.Resolve<IEventHandler<TestEvent>>()).Returns(() => null);

await testBusEngine.HandleEvent(message);

mockScope.Verify(p => p.Resolve<IEventHandler<TestEvent>>(), Times.Once);
}

[Test]
public async Task SubscribeToEvent_TEvent_TEventHandler_instance_is_invoked_when_message_is_received()
{
Expand Down Expand Up @@ -237,6 +253,23 @@ public void SubscribeToCommand_TCommand_TCommandHandler_instance_invokes_bus_eng
Assert.That(testBusEngine.IsCommandHandled<TestCommand>());
}

[Test]
public async Task No_exception_is_thrown_when_no_handler_for_TCommand_is_found()
{
var sut = CreateSystemUnderTest();

sut.SubscribeToCommand<TestCommand>();

var message = fixture.Create<CommandMessage<TestCommand>>();

mockScope.Setup(p => p.Resolve<ICommandHandler<TestCommand>>()).Returns(() => null);

await testBusEngine.HandleCommand(message);

mockScope.Verify(p => p.Resolve<ICommandHandler<TestCommand>>(), Times.Once);
}


[Test]
public async Task SubscribeToCommand_TCommand_is_invoked_when_message_is_received()
{
Expand Down
87 changes: 87 additions & 0 deletions tests/Tests.Interfaces/BusExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public void Initialize()
mockBus = new Mock<IBus>();
}

#region InvokeManyCommands

[Test]
public async Task InvokeManyCommands_forwards_to_IBus()
{
Expand Down Expand Up @@ -53,6 +55,49 @@ public async Task InvokeManyCommands_no_exception_if_commands_is_null()

}

#endregion

#region InvokeManyCommands with CorrelationId

[Test]
public async Task InvokeManyCommands_CorrelationId_forwards_to_IBus()
{
var commands = fixture.CreateMany<TestCommand>().ToArray();

var correlationId = fixture.Create<Guid>();

await BusExtensions.InvokeManyCommands(mockBus.Object, commands, correlationId);

mockBus.Verify(p => p.InvokeCommand(It.IsAny<TestCommand>(), correlationId), Times.Exactly(commands.Length));
}

[Test, ExpectedException]
public async Task InvokeManyCommands_CorrelationId_Bus_is_required()
{
var commands = fixture.CreateMany<TestCommand>().ToArray();

var correlationId = fixture.Create<Guid>();

await BusExtensions.InvokeManyCommands(null, commands, correlationId);
}

[Test]
public async Task InvokeManyCommands_CorrelationId_no_exception_if_commands_is_null()
{
IEnumerable<TestCommand> commands = null;

var correlationId = fixture.Create<Guid>();

await BusExtensions.InvokeManyCommands(mockBus.Object, commands, correlationId);

mockBus.Verify(p => p.InvokeCommand(It.IsAny<TestCommand>()), Times.Never);

}

#endregion

#region RaiseManyEvents

[Test]
public async Task RaiseManyEvents_forwards_to_IBus()
{
Expand Down Expand Up @@ -82,6 +127,48 @@ public async Task RaiseManyEvents_no_exception_if_commands_is_null()

}

#endregion


#region RaiseManyEvents with CorrelationId

[Test]
public async Task RaiseManyEvents_CorrelationId_forwards_to_IBus()
{
var events = fixture.CreateMany<TestEvent>().ToArray();

var correlationId = fixture.Create<Guid>();

await BusExtensions.RaiseManyEvents(mockBus.Object, events, correlationId);

mockBus.Verify(p => p.RaiseEvent(It.IsAny<TestEvent>(), correlationId), Times.Exactly(events.Length));
}

[Test, ExpectedException]
public async Task RaiseManyEvents_CorrelationId_Bus_is_required()
{
var events = fixture.CreateMany<TestEvent>().ToArray();

var correlationId = fixture.Create<Guid>();

await BusExtensions.RaiseManyEvents(null, events, correlationId);
}

[Test]
public async Task RaiseManyEvents_CorrelationId_no_exception_if_commands_is_null()
{
IEnumerable<TestEvent> events = null;

var correlationId = fixture.Create<Guid>();

await BusExtensions.RaiseManyEvents(mockBus.Object, events, correlationId);

mockBus.Verify(p => p.RaiseEvent(It.IsAny<TestEvent>(), It.IsAny<Guid>()), Times.Never);

}

#endregion

public class TestCommand : ICommand
{
public string StringValue { get; set; }
Expand Down

0 comments on commit 30afbe0

Please sign in to comment.