Skip to content

Commit

Permalink
Simplified IBusEngine (#42)
Browse files Browse the repository at this point in the history
* SendCommandAsync removed
* SendEventAsync removed
* SendMessageAsync added
  • Loading branch information
Kralizek committed Jan 20, 2019
1 parent fe00ae8 commit 3808bd2
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 82 deletions.
8 changes: 2 additions & 6 deletions src/Nybus.Abstractions/IBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ namespace Nybus
{
public interface IBusEngine
{
Task SendCommandAsync<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand;

Task SendEventAsync<TEvent>(EventMessage<TEvent> message) where TEvent : class, IEvent;

Task<IObservable<Message>> StartAsync();

Task StopAsync();
Expand All @@ -17,10 +13,10 @@ public interface IBusEngine

void SubscribeToEvent<TEvent>() where TEvent : class, IEvent;

Task SendMessageAsync(Message message);

Task NotifySuccessAsync(Message message);

Task NotifyFailAsync(Message message);
}

public delegate Task MessageReceived(Message message);
}
2 changes: 1 addition & 1 deletion src/Nybus.Abstractions/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class Message

public MessageDescriptor Descriptor => new MessageDescriptor(Type);

protected object Item { get; set; }
public object Item { get; protected set; }
}

public enum MessageType
Expand Down
4 changes: 2 additions & 2 deletions src/Nybus/Filters/RetryErrorFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public async Task HandleErrorAsync<TCommand>(ICommandContext<TCommand> context,

message.Headers[Headers.RetryCount] = retryCount.Stringfy();

await _engine.SendCommandAsync(message).ConfigureAwait(false);
await _engine.SendMessageAsync(message).ConfigureAwait(false);
}
else
{
Expand All @@ -89,7 +89,7 @@ public async Task HandleErrorAsync<TEvent>(IEventContext<TEvent> context, Except

message.Headers[Headers.RetryCount] = retryCount.Stringfy();

await _engine.SendEventAsync(message).ConfigureAwait(false);
await _engine.SendMessageAsync(message).ConfigureAwait(false);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/Nybus/NybusHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Task InvokeCommandAsync<TCommand>(TCommand command, Guid correlationId)
};

_logger.LogTrace(new { type = typeof(TCommand).FullName, correlationId = correlationId, command }, arg => $"Invoking command of type {arg.type} with correlationId {arg.correlationId}. Command: {arg.command.ToString()}");
return _engine.SendCommandAsync(message);
return _engine.SendMessageAsync(message);
}

public Task RaiseEventAsync<TEvent>(TEvent @event, Guid correlationId)
Expand All @@ -65,7 +65,7 @@ public Task RaiseEventAsync<TEvent>(TEvent @event, Guid correlationId)
};

_logger.LogTrace(new { type = typeof(TEvent).FullName, correlationId = correlationId, @event }, arg => $"Raising event of type {arg.type} with correlationId {arg.correlationId}. Event: {arg.@event.ToString()}");
return _engine.SendEventAsync(message);
return _engine.SendMessageAsync(message);
}

private bool _isStarted;
Expand Down
31 changes: 13 additions & 18 deletions src/engines/Nybus.Engine.InMemory/InMemory/IEnvelopeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace Nybus.InMemory
{
public interface IEnvelopeService
{
Envelope CreateEnvelope(Message message);

Envelope CreateEnvelope<T>(CommandMessage<T> message) where T : class, ICommand;

Envelope CreateEnvelope<T>(EventMessage<T> message) where T : class, IEvent;
Expand All @@ -24,8 +26,7 @@ public EnvelopeService(ISerializer serializer)
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
}

public Envelope CreateEnvelope<T>(CommandMessage<T> message)
where T : class, ICommand
public Envelope CreateEnvelope(Message message)
{
if (message == null)
{
Expand All @@ -35,29 +36,23 @@ public Envelope CreateEnvelope<T>(CommandMessage<T> message)
return new Envelope
{
Headers = message.Headers,
Content = _serializer.SerializeObject(message.Command),
MessageType = MessageType.Command,
MessageType = message.MessageType,
MessageId = message.MessageId,
Type = message.Type
Type = message.Type,
Content = _serializer.SerializeObject(message.Item)
};
}

public Envelope CreateEnvelope<T>(CommandMessage<T> message)
where T : class, ICommand
{
return CreateEnvelope(message as Message);
}

public Envelope CreateEnvelope<T>(EventMessage<T> message)
where T : class, IEvent
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

return new Envelope
{
Headers = message.Headers,
Content = _serializer.SerializeObject(message.Event),
MessageType = MessageType.Event,
MessageId = message.MessageId,
Type = message.Type
};
return CreateEnvelope(message as Message);
}

public CommandMessage CreateCommandMessage(Envelope envelope, Type commandType)
Expand Down
29 changes: 9 additions & 20 deletions src/engines/Nybus.Engine.InMemory/InMemory/InMemoryBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,19 @@ public Task StopAsync()
return Task.CompletedTask;
}

public Task SendCommandAsync<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand
public void SubscribeToCommand<TCommand>() where TCommand : class, ICommand
{
if (_isStarted)
{
var envelope = _envelopeService.CreateEnvelope(message);
_sequenceOfMessages.OnNext(envelope);
}
_messageDescriptorStore.RegisterCommandType<TCommand>();
_acceptedTypes.Add(typeof(TCommand));
}

return Task.CompletedTask;
public void SubscribeToEvent<TEvent>() where TEvent : class, IEvent
{
_messageDescriptorStore.RegisterEventType<TEvent>();
_acceptedTypes.Add(typeof(TEvent));
}

public Task SendEventAsync<TEvent>(EventMessage<TEvent> message) where TEvent : class, IEvent
public Task SendMessageAsync(Message message)
{
if (_isStarted)
{
Expand All @@ -113,18 +114,6 @@ public Task StopAsync()
return Task.CompletedTask;
}

public void SubscribeToCommand<TCommand>() where TCommand : class, ICommand
{
_messageDescriptorStore.RegisterCommandType<TCommand>();
_acceptedTypes.Add(typeof(TCommand));
}

public void SubscribeToEvent<TEvent>() where TEvent : class, IEvent
{
_messageDescriptorStore.RegisterEventType<TEvent>();
_acceptedTypes.Add(typeof(TEvent));
}

public Task NotifySuccessAsync(Message message)
{
OnMessageNotifySuccess?.Invoke(this, new MessageEventArgs(message));
Expand Down
32 changes: 13 additions & 19 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,23 @@ public Task StopAsync()
return Task.CompletedTask;
}

public Task SendCommandAsync<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand
=> SendItemAsync(message, message.Command);
public void SubscribeToCommand<TCommand>()
where TCommand : class, ICommand
{
_messageDescriptorStore.RegisterCommandType<TCommand>();
}

public Task SendEventAsync<TEvent>(EventMessage<TEvent> message) where TEvent : class, IEvent
=> SendItemAsync(message, message.Event);
public void SubscribeToEvent<TEvent>()
where TEvent : class, IEvent
{
_messageDescriptorStore.RegisterEventType<TEvent>();
}

private Task SendItemAsync<T>(Message message, T item)
public Task SendMessageAsync(Message message)
{
var type = typeof(T);
var type = message.Type;

var body = _configuration.Serializer.SerializeObject(item, _configuration.OutboundEncoding);
var body = _configuration.Serializer.SerializeObject(message.Item, _configuration.OutboundEncoding);

var properties = _channel.CreateBasicProperties();
properties.ContentEncoding = _configuration.OutboundEncoding.WebName;
Expand All @@ -211,18 +217,6 @@ private Task SendItemAsync<T>(Message message, T item)

}

public void SubscribeToCommand<TCommand>()
where TCommand : class, ICommand
{
_messageDescriptorStore.RegisterCommandType<TCommand>();
}

public void SubscribeToEvent<TEvent>()
where TEvent : class, IEvent
{
_messageDescriptorStore.RegisterEventType<TEvent>();
}

public Task NotifySuccessAsync(Message message)
{
if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag))
Expand Down
5 changes: 5 additions & 0 deletions tests/TestUtils/TestBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void SubscribeToEvent<TEvent>()
throw new NotImplementedException();
}

public Task SendMessageAsync(Message message)
{
throw new NotImplementedException();
}

public Task NotifySuccessAsync(Message message)
{
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task Sent_commands_are_received([Frozen] IEnvelopeService envelopeS

var items = sequence.DumpInList();

await sut.SendCommandAsync(testMessage);
await sut.SendMessageAsync(testMessage);

Assert.That(items.First(), Is.EqualTo(testMessage).Using<CommandMessage<FirstTestCommand>>((x, y) => x.MessageId == y.MessageId));
}
Expand All @@ -83,7 +83,7 @@ public async Task Sent_events_are_received([Frozen] IEnvelopeService envelopeSer

var items = sequence.DumpInList();

await sut.SendEventAsync(testMessage);
await sut.SendMessageAsync(testMessage);

Assert.That(items.First(), Is.EqualTo(testMessage).Using<EventMessage<FirstTestEvent>>((x, y) => x.MessageId == y.MessageId));
}
Expand Down Expand Up @@ -206,7 +206,7 @@ public async Task Commands_are_ignored_if_not_registered([Frozen] IMessageDescri

var items = sequence.DumpInList();

await sut.SendCommandAsync(testMessage);
await sut.SendMessageAsync(testMessage);

Assert.That(items, Is.Empty);
}
Expand All @@ -229,7 +229,7 @@ public async Task Events_are_ignored_if_not_registered([Frozen] IMessageDescript

var items = sequence.DumpInList();

await sut.SendEventAsync(testMessage);
await sut.SendMessageAsync(testMessage);

Assert.That(items, Is.Empty);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public async Task Commands_can_be_sent([Frozen] IRabbitMqConfiguration configura
{
await sut.StartAsync().ConfigureAwait(false);

await sut.SendCommandAsync(message);
await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<IBasicProperties>(), It.IsAny<byte[]>()));
}
Expand All @@ -439,7 +439,7 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_commands([Frozen]

await sut.StartAsync().ConfigureAwait(false);

await sut.SendCommandAsync(message);
await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(o => o.Headers.ContainsKey($"Nybus:{headerKey}")), It.IsAny<byte[]>()));
}
Expand All @@ -449,7 +449,7 @@ public async Task Events_can_be_sent([Frozen] IRabbitMqConfiguration configurati
{
await sut.StartAsync().ConfigureAwait(false);

await sut.SendEventAsync(message);
await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<IBasicProperties>(), It.IsAny<byte[]>()));

Expand All @@ -462,7 +462,7 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_events([Frozen] I

await sut.StartAsync().ConfigureAwait(false);

await sut.SendEventAsync(message);
await sut.SendMessageAsync(message);

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.Is<IBasicProperties>(o => o.Headers.ContainsKey($"Nybus:{headerKey}")), It.IsAny<byte[]>()));
}
Expand Down
8 changes: 4 additions & 4 deletions tests/Tests.Nybus/Filters/RetryErrorFilterProviderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Froze

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.SendCommandAsync(context.CommandMessage));
Mock.Get(engine).Verify(p => p.SendMessageAsync(context.CommandMessage));
}

[Test, CustomAutoMoqData]
Expand All @@ -114,7 +114,7 @@ public async Task HandleError_retries_if_retry_count_not_present([Frozen] IBusEn

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.SendCommandAsync(context.CommandMessage));
Mock.Get(engine).Verify(p => p.SendMessageAsync(context.CommandMessage));
}

[Test, CustomAutoMoqData]
Expand Down Expand Up @@ -160,7 +160,7 @@ public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Froze

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.SendEventAsync(context.EventMessage));
Mock.Get(engine).Verify(p => p.SendMessageAsync(context.EventMessage));
}

[Test, CustomAutoMoqData]
Expand All @@ -183,7 +183,7 @@ public async Task HandleError_retries_if_retry_count_not_present([Frozen] IBusEn

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.SendEventAsync(context.EventMessage));
Mock.Get(engine).Verify(p => p.SendMessageAsync(context.EventMessage));
}

[Test, CustomAutoMoqData]
Expand Down
4 changes: 2 additions & 2 deletions tests/Tests.Nybus/NybusHostTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public async Task InvokeCommandAsync_forwards_message_to_engine([Frozen] IBusEng
{
await sut.InvokeCommandAsync(testCommand, correlationId);

Mock.Get(engine).Verify(p => p.SendCommandAsync(It.Is<CommandMessage<FirstTestCommand>>(m => ReferenceEquals(m.Command, testCommand) && m.Headers.CorrelationId == correlationId)), Times.Once);
Mock.Get(engine).Verify(p => p.SendMessageAsync(It.Is<CommandMessage<FirstTestCommand>>(m => ReferenceEquals(m.Command, testCommand) && m.Headers.CorrelationId == correlationId)), Times.Once);
}

[Test, CustomAutoMoqData]
public async Task RaiseEventAsync_forwards_message_to_engine([Frozen] IBusEngine engine, NybusHost sut, FirstTestEvent testEvent, Guid correlationId)
{
await sut.RaiseEventAsync(testEvent, correlationId);

Mock.Get(engine).Verify(p => p.SendEventAsync(It.Is<EventMessage<FirstTestEvent>>(m => ReferenceEquals(m.Event, testEvent) && m.Headers.CorrelationId == correlationId)), Times.Once);
Mock.Get(engine).Verify(p => p.SendMessageAsync(It.Is<EventMessage<FirstTestEvent>>(m => ReferenceEquals(m.Event, testEvent) && m.Headers.CorrelationId == correlationId)), Times.Once);
}

[Test, CustomAutoMoqData]
Expand Down

0 comments on commit 3808bd2

Please sign in to comment.