Skip to content

Commit

Permalink
Improvements to async/await (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kralizek committed Jan 7, 2019
1 parent 1d40e47 commit 73024f0
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 130 deletions.
2 changes: 1 addition & 1 deletion Nybus.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/FilterSettingsManager/AttributeFilterXml/@EntryValue">&lt;data /&gt;</s:String>
<s:String x:Key="/Default/FilterSettingsManager/CoverageFilterXml/@EntryValue">&lt;data&gt;&lt;IncludeFilters /&gt;&lt;ExcludeFilters /&gt;&lt;/data&gt;</s:String>
<s:String x:Key="/Default/FilterSettingsManager/CoverageFilterXml/@EntryValue">&lt;data&gt;&lt;IncludeFilters&gt;&lt;Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /&gt;&lt;/IncludeFilters&gt;&lt;ExcludeFilters&gt;&lt;Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /&gt;&lt;/ExcludeFilters&gt;&lt;/data&gt;</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nybus/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<Authors>Renato Golia</Authors>
<Version>1.0.0-alpha004</Version>
<Version>1.0.0-alpha005</Version>
<Company>Nybus Project</Company>
<Product>Nybus</Product>
<license>https://github.com/Nybus-project/Nybus/blob/master/LICENSE</license>
Expand Down
8 changes: 4 additions & 4 deletions src/Nybus.Abstractions/IBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ public interface IBusEngine

Task SendEventAsync<TEvent>(EventMessage<TEvent> message) where TEvent : class, IEvent;

IObservable<Message> Start();
Task<IObservable<Message>> StartAsync();

void Stop();
Task StopAsync();

void SubscribeToCommand<TCommand>() where TCommand : class, ICommand;

void SubscribeToEvent<TEvent>() where TEvent : class, IEvent;

Task NotifySuccess(Message message);
Task NotifySuccessAsync(Message message);

Task NotifyFail(Message message);
Task NotifyFailAsync(Message message);
}

public delegate Task MessageReceived(Message message);
Expand Down
4 changes: 2 additions & 2 deletions src/Nybus.Abstractions/IBusHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public interface IDispatcher

public interface IBusExecutionEnvironment
{
Task ExecuteCommandHandler<TCommand>(IDispatcher dispatcher, ICommandContext<TCommand> context, Type handlerType)
Task ExecuteCommandHandlerAsync<TCommand>(IDispatcher dispatcher, ICommandContext<TCommand> context, Type handlerType)
where TCommand : class, ICommand;

Task ExecuteEventHandler<TEvent>(IDispatcher dispatcher, IEventContext<TEvent> context, Type handlerType)
Task ExecuteEventHandlerAsync<TEvent>(IDispatcher dispatcher, IEventContext<TEvent> context, Type handlerType)
where TEvent : class, IEvent;
}
}
4 changes: 2 additions & 2 deletions src/Nybus.Abstractions/Policies/IErrorPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Nybus.Policies

public interface IErrorPolicy
{
Task HandleError<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message) where TCommand : class, ICommand;
Task HandleErrorAsync<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message) where TCommand : class, ICommand;

Task HandleError<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message) where TEvent : class, IEvent;
Task HandleErrorAsync<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message) where TEvent : class, IEvent;
}

public interface IErrorPolicyProvider
Expand Down
12 changes: 7 additions & 5 deletions src/Nybus/InMemoryBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class InMemoryBusEngine : IBusEngine
private bool _isStarted;
private readonly ISet<Type> _acceptedTypes = new HashSet<Type>();

public IObservable<Message> Start()
public Task<IObservable<Message>> StartAsync()
{
_sequenceOfMessages = new Subject<Message>();

Expand All @@ -33,16 +33,18 @@ public IObservable<Message> Start()

_isStarted = true;

return Observable.Merge(commands, events);
return Task.FromResult(Observable.Merge(commands, events));
}

public void Stop()
public Task StopAsync()
{
if (_isStarted)
{
_sequenceOfMessages.OnCompleted();
_sequenceOfMessages = null;
}

return Task.CompletedTask;
}

public Task SendCommandAsync<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand
Expand Down Expand Up @@ -75,12 +77,12 @@ public void Stop()
_acceptedTypes.Add(typeof(TEvent));
}

public Task NotifySuccess(Message message)
public Task NotifySuccessAsync(Message message)
{
return Task.CompletedTask;
}

public Task NotifyFail(Message message)
public Task NotifyFailAsync(Message message)
{
return Task.CompletedTask;
}
Expand Down
28 changes: 14 additions & 14 deletions src/Nybus/NybusHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public async Task InvokeCommandAsync<TCommand>(TCommand command, Guid correlationId) where TCommand : class, ICommand
public Task InvokeCommandAsync<TCommand>(TCommand command, Guid correlationId) where TCommand : class, ICommand
{
var message = new CommandMessage<TCommand>
{
Expand All @@ -40,10 +40,10 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi
};

_logger.LogTrace(new { type = typeof(TCommand).FullName, correlationId = correlationId, command }, arg => $"Invoking command of type {arg.type} with correlationId {arg.correlationId}. Command: {arg.command.ToString()}");
await _engine.SendCommandAsync(message).ConfigureAwait(false);
return _engine.SendCommandAsync(message);
}

public async Task RaiseEventAsync<TEvent>(TEvent @event, Guid correlationId) where TEvent : class, IEvent
public Task RaiseEventAsync<TEvent>(TEvent @event, Guid correlationId) where TEvent : class, IEvent
{
var message = new EventMessage<TEvent>
{
Expand All @@ -57,7 +57,7 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi
};

_logger.LogTrace(new { type = typeof(TEvent).FullName, correlationId = correlationId, @event }, arg => $"Raising event of type {arg.type} with correlationId {arg.correlationId}. Event: {arg.@event.ToString()}");
await _engine.SendEventAsync(message).ConfigureAwait(false);
return _engine.SendEventAsync(message);
}

private bool _isStarted;
Expand All @@ -67,7 +67,7 @@ public Task StartAsync()
{
_logger.LogTrace("Bus starting");

var incomingMessages = _engine.Start();
var incomingMessages = _engine.StartAsync().Result;

var observable = from message in incomingMessages
where message != null
Expand All @@ -90,7 +90,7 @@ public Task StopAsync()
{
_logger.LogTrace("Bus stopping");

_engine.Stop();
_engine.StopAsync().Wait();
_disposable.Dispose();

_logger.LogTrace("Bus stopped");
Expand Down Expand Up @@ -133,13 +133,13 @@ async Task ExecuteHandler(ICommandContext<TCommand> context)

async Task NotifySuccess(CommandMessage<TCommand> message)
{
await _engine.NotifySuccess(message).ConfigureAwait(false);
await _engine.NotifySuccessAsync(message).ConfigureAwait(false);
}

async Task HandleError(Exception exception, CommandMessage<TCommand> message, ICommandContext<TCommand> context)
Task HandleError(Exception exception, CommandMessage<TCommand> message, ICommandContext<TCommand> context)
{
_logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, CommandType = typeof(TCommand).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.CommandType}. {s.Exception.Message}");
await _configuration.ErrorPolicy.HandleError(_engine, exception, message).ConfigureAwait(false);
return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message);
}
}

Expand Down Expand Up @@ -175,21 +175,21 @@ async Task ExecuteHandler(IEventContext<TEvent> context)

async Task NotifySuccess(EventMessage<TEvent> message)
{
await _engine.NotifySuccess(message).ConfigureAwait(false);
await _engine.NotifySuccessAsync(message).ConfigureAwait(false);
}

async Task HandleError(Exception exception, EventMessage<TEvent> message, IEventContext<TEvent> context)
Task HandleError(Exception exception, EventMessage<TEvent> message, IEventContext<TEvent> context)
{
_logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, EventType = typeof(TEvent).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.EventType}. {s.Exception.Message}");
await _configuration.ErrorPolicy.HandleError(_engine, exception, message).ConfigureAwait(false);
return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message);
}
}

private delegate Task MessagePipeline(Message message);

public IBusExecutionEnvironment ExecutionEnvironment => this;

public async Task ExecuteCommandHandler<TCommand>(IDispatcher dispatcher, ICommandContext<TCommand> context, Type handlerType)
public async Task ExecuteCommandHandlerAsync<TCommand>(IDispatcher dispatcher, ICommandContext<TCommand> context, Type handlerType)
where TCommand : class, ICommand
{
using (var scope = _serviceProvider.CreateScope())
Expand All @@ -199,7 +199,7 @@ public async Task ExecuteCommandHandler<TCommand>(IDispatcher dispatcher, IComma
}
}

public async Task ExecuteEventHandler<TEvent>(IDispatcher dispatcher, IEventContext<TEvent> context, Type handlerType)
public async Task ExecuteEventHandlerAsync<TEvent>(IDispatcher dispatcher, IEventContext<TEvent> context, Type handlerType)
where TEvent : class, IEvent
{
using (var scope = _serviceProvider.CreateScope())
Expand Down
4 changes: 2 additions & 2 deletions src/Nybus/NybusHostBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void SubscribeToCommand<TCommand>(Type commandHandlerType)

_subscriptions.Add(host =>
{
host.SubscribeToCommand<TCommand>((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandler(dispatcher,context,commandHandlerType));
host.SubscribeToCommand<TCommand>((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandlerAsync(dispatcher,context,commandHandlerType));
});
}

Expand All @@ -52,7 +52,7 @@ public void SubscribeToEvent<TEvent>(Type eventHandlerType)

_subscriptions.Add(host =>
{
host.SubscribeToEvent<TEvent>((dispatcher, context) => host.ExecutionEnvironment.ExecuteEventHandler(dispatcher, context, eventHandlerType));
host.SubscribeToEvent<TEvent>((dispatcher, context) => host.ExecutionEnvironment.ExecuteEventHandlerAsync(dispatcher, context, eventHandlerType));
});
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/Nybus/Policies/NoopErrorPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ public IErrorPolicy CreatePolicy(IConfigurationSection configuration)

public class NoopErrorPolicy : IErrorPolicy
{
public async Task HandleError<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message)
public Task HandleErrorAsync<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message)
where TCommand : class, ICommand
{
await engine.NotifyFail(message).ConfigureAwait(false);
return engine.NotifyFailAsync(message);
}

public async Task HandleError<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message)
public Task HandleErrorAsync<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message)
where TEvent : class, IEvent
{
await engine.NotifyFail(message).ConfigureAwait(false);
return engine.NotifyFailAsync(message);
}
}
}
8 changes: 4 additions & 4 deletions src/Nybus/Policies/RetryErrorPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public RetryErrorPolicy(RetryErrorPolicyOptions options, ILogger<RetryErrorPolic
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleError<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message)
public async Task HandleErrorAsync<TCommand>(IBusEngine engine, Exception exception, CommandMessage<TCommand> message)
where TCommand : class, ICommand
{
var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out var str) && int.TryParse(str, out var i) ? i : 0;
Expand All @@ -61,12 +61,12 @@ public async Task HandleError<TCommand>(IBusEngine engine, Exception exception,
{
_logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry");

await engine.NotifyFail(message).ConfigureAwait(false);
await engine.NotifyFailAsync(message).ConfigureAwait(false);
}

}

public async Task HandleError<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message)
public async Task HandleErrorAsync<TEvent>(IBusEngine engine, Exception exception, EventMessage<TEvent> message)
where TEvent : class, IEvent
{
var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out string str) && int.TryParse(str, out int i) ? i : 0;
Expand All @@ -81,7 +81,7 @@ public async Task HandleError<TEvent>(IBusEngine engine, Exception exception, Ev
}
else
{
await engine.NotifyFail(message).ConfigureAwait(false);
await engine.NotifyFailAsync(message).ConfigureAwait(false);
}
}

Expand Down
24 changes: 14 additions & 10 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public RabbitMqBusEngine(IRabbitMqConfiguration configuration, ILogger<RabbitMqB

public IReadOnlyDictionary<string, ObservableConsumer> Consumers => _consumers;

public IObservable<Message> Start()
public Task<IObservable<Message>> StartAsync()
{
_connection = _configuration.ConnectionFactory.CreateConnection();
_channel = _connection.CreateModel();
Expand All @@ -45,7 +45,7 @@ public IObservable<Message> Start()

if (!hasEvents && !hasCommands)
{
return Observable.Never<Message>();
return Task.FromResult(Observable.Never<Message>());
}

var queueToConsume = new List<string>();
Expand Down Expand Up @@ -83,11 +83,13 @@ public IObservable<Message> Start()
}


return Observable.Defer(() => from queue in queueToConsume.ToObservable()
from args in SubscribeMessages(_channel, queue)
let message = GetMessage(args)
where message != null
select message);
var sequence = Observable.Defer(() => from queue in queueToConsume.ToObservable()
from args in SubscribeMessages(_channel, queue)
let message = GetMessage(args)
where message != null
select message);

return Task.FromResult(sequence);

IObservable<BasicDeliverEventArgs> SubscribeMessages(IModel channel, string queueName)
{
Expand Down Expand Up @@ -185,10 +187,12 @@ string GetHeader(IBasicProperties properties, string headerName, Encoding encodi
}
}

public void Stop()
public Task StopAsync()
{
_channel.Dispose();
_connection.Dispose();

return Task.CompletedTask;
}

public Task SendCommandAsync<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand
Expand Down Expand Up @@ -239,7 +243,7 @@ public void SubscribeToEvent<TEvent>()
AcceptedEventTypes.Add(typeof(TEvent));
}

public Task NotifySuccess(Message message)
public Task NotifySuccessAsync(Message message)
{
if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag))
{
Expand All @@ -263,7 +267,7 @@ public Task NotifySuccess(Message message)
return Task.CompletedTask;
}

public Task NotifyFail(Message message)
public Task NotifyFailAsync(Message message)
{
if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag))
{
Expand Down
8 changes: 4 additions & 4 deletions tests/TestUtils/TestBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public Task SendEventAsync<TEvent>(EventMessage<TEvent> message)
throw new NotImplementedException();
}

public IObservable<Message> Start()
public Task<IObservable<Message>> StartAsync()
{
throw new NotImplementedException();
}

public void Stop()
public Task StopAsync()
{
throw new NotImplementedException();
}
Expand All @@ -42,12 +42,12 @@ public void SubscribeToEvent<TEvent>()
throw new NotImplementedException();
}

public Task NotifySuccess(Message message)
public Task NotifySuccessAsync(Message message)
{
throw new NotImplementedException();
}

public Task NotifyFail(Message message)
public Task NotifyFailAsync(Message message)
{
throw new NotImplementedException();
}
Expand Down
Loading

0 comments on commit 73024f0

Please sign in to comment.