diff --git a/Nybus.sln b/Nybus.sln
index 95804e3..21b0dea 100644
--- a/Nybus.sln
+++ b/Nybus.sln
@@ -43,10 +43,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SenderFeedback", "samples\R
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Types", "samples\RabbitMQ\Types\Types.csproj", "{5EADDCA9-0316-4596-85B1-6C91EDFE8460}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests.Integration.Nybus", "tests\Tests.Integration.Nybus\Tests.Integration.Nybus.csproj", "{C498B46D-26EF-4AAE-ADFA-5763609A688B}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestUtils", "tests\TestUtils\TestUtils.csproj", "{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nybus.Engine.InMemory", "src\engines\Nybus.Engine.InMemory\Nybus.Engine.InMemory.csproj", "{C59D94B8-9869-4E12-B393-2645E9C38BF3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests.Nybus.Engine.InMemory", "tests\Tests.Nybus.Engine.InMemory\Tests.Nybus.Engine.InMemory.csproj", "{8AA01AE0-1F81-41E4-917C-682E22B645D2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests.Integration.Nybus.Engine.InMemory", "tests\Tests.Integration.Nybus.Engine.InMemory\Tests.Integration.Nybus.Engine.InMemory.csproj", "{6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -225,18 +229,6 @@ Global
{5EADDCA9-0316-4596-85B1-6C91EDFE8460}.Release|x64.Build.0 = Release|Any CPU
{5EADDCA9-0316-4596-85B1-6C91EDFE8460}.Release|x86.ActiveCfg = Release|Any CPU
{5EADDCA9-0316-4596-85B1-6C91EDFE8460}.Release|x86.Build.0 = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|x64.ActiveCfg = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|x64.Build.0 = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|x86.ActiveCfg = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Debug|x86.Build.0 = Debug|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|Any CPU.Build.0 = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|x64.ActiveCfg = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|x64.Build.0 = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|x86.ActiveCfg = Release|Any CPU
- {C498B46D-26EF-4AAE-ADFA-5763609A688B}.Release|x86.Build.0 = Release|Any CPU
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -249,6 +241,42 @@ Global
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Release|x64.Build.0 = Release|Any CPU
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Release|x86.ActiveCfg = Release|Any CPU
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4}.Release|x86.Build.0 = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|x64.Build.0 = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Debug|x86.Build.0 = Debug|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|x64.ActiveCfg = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|x64.Build.0 = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|x86.ActiveCfg = Release|Any CPU
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3}.Release|x86.Build.0 = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|x64.Build.0 = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Debug|x86.Build.0 = Debug|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|x64.ActiveCfg = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|x64.Build.0 = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|x86.ActiveCfg = Release|Any CPU
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2}.Release|x86.Build.0 = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|x64.Build.0 = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Debug|x86.Build.0 = Debug|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|x64.ActiveCfg = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|x64.Build.0 = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|x86.ActiveCfg = Release|Any CPU
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -271,8 +299,10 @@ Global
{05B81794-040C-49B4-A248-C72558582953} = {18B7CFF6-C18A-48FF-ADB2-557333ABCD6D}
{E2016F0E-AEB0-4D33-9456-33DCCBD41F09} = {18B7CFF6-C18A-48FF-ADB2-557333ABCD6D}
{5EADDCA9-0316-4596-85B1-6C91EDFE8460} = {18B7CFF6-C18A-48FF-ADB2-557333ABCD6D}
- {C498B46D-26EF-4AAE-ADFA-5763609A688B} = {3566B926-8E32-4B0D-8DC0-6DB1EB82D2CB}
{6DAC88A4-9DCF-490B-81D3-B0AA72993EA4} = {3566B926-8E32-4B0D-8DC0-6DB1EB82D2CB}
+ {C59D94B8-9869-4E12-B393-2645E9C38BF3} = {22CE8614-4057-44E0-8140-FFB46C642539}
+ {8AA01AE0-1F81-41E4-917C-682E22B645D2} = {3566B926-8E32-4B0D-8DC0-6DB1EB82D2CB}
+ {6B36CB22-F186-4AD9-8A6D-13E7A3E6D3A2} = {3566B926-8E32-4B0D-8DC0-6DB1EB82D2CB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {087BE4A7-5329-4F83-84DF-7AA56A3944EE}
diff --git a/Nybus.sln.DotSettings b/Nybus.sln.DotSettings
index 0946073..899275e 100644
--- a/Nybus.sln.DotSettings
+++ b/Nybus.sln.DotSettings
@@ -1,4 +1,4 @@
<data />
- <data><IncludeFilters><Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></IncludeFilters><ExcludeFilters><Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></ExcludeFilters></data>
+ <data><IncludeFilters><Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /></IncludeFilters><ExcludeFilters><Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /><Filter ModuleMask="Test*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /></ExcludeFilters></data>
True
\ No newline at end of file
diff --git a/appveyor.yml b/appveyor.yml
index 24bd10a..11ccf41 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -32,6 +32,9 @@ artifacts:
name: packages
- path: outputs\*.snupkg
name: symbols
+ - path: outputs\tests\report
+ name: coverage report
+ type: zip
deploy:
provider: NuGet
@@ -40,12 +43,12 @@ deploy:
secure: yP12k1vrHJyValU1UWnW3NySjKfXlRomR8p4qqmYs26FDztsGMlCQeT5jdCVHlz6
artifact: packages
-for:
- -
- branches:
- only:
- - master
- only_commits:
- message: /build/
- files:
- - src/
\ No newline at end of file
+# for:
+# -
+# branches:
+# only:
+# - master
+# only_commits:
+# message: /build/
+# files:
+# - src/
\ No newline at end of file
diff --git a/samples/InMemory/NetCoreConsoleApp/NetCoreConsoleApp.csproj b/samples/InMemory/NetCoreConsoleApp/NetCoreConsoleApp.csproj
index 0a6290f..2c2dde3 100644
--- a/samples/InMemory/NetCoreConsoleApp/NetCoreConsoleApp.csproj
+++ b/samples/InMemory/NetCoreConsoleApp/NetCoreConsoleApp.csproj
@@ -12,6 +12,7 @@
+
diff --git a/samples/InMemory/NetCoreConsoleApp/Program.cs b/samples/InMemory/NetCoreConsoleApp/Program.cs
index 082d336..39ade74 100644
--- a/samples/InMemory/NetCoreConsoleApp/Program.cs
+++ b/samples/InMemory/NetCoreConsoleApp/Program.cs
@@ -6,7 +6,6 @@
using Microsoft.Extensions.Configuration;
using System.Collections.Generic;
using System.Threading;
-using Nybus.Policies;
using Nybus.Utils;
namespace NetCoreConsoleApp
diff --git a/src/Nybus.Abstractions/Message.cs b/src/Nybus.Abstractions/Message.cs
index dcc104a..0417ac7 100644
--- a/src/Nybus.Abstractions/Message.cs
+++ b/src/Nybus.Abstractions/Message.cs
@@ -13,6 +13,10 @@ public abstract class Message
public abstract MessageType MessageType { get; }
public abstract Type Type { get; }
+
+ public MessageDescriptor Descriptor => MessageDescriptor.CreateFromType(Type);
+
+ protected object Item { get; set; }
}
public enum MessageType
@@ -47,11 +51,17 @@ public static class Headers
public abstract class CommandMessage : Message
{
public override MessageType MessageType => MessageType.Command;
+
+ public void SetCommand(ICommand command) => Item = command ?? throw new ArgumentNullException(nameof(command));
}
- public class CommandMessage : CommandMessage where TCommand: class, ICommand
+ public sealed class CommandMessage : CommandMessage where TCommand: class, ICommand
{
- public TCommand Command { get; set; }
+ public TCommand Command
+ {
+ get => Item as TCommand;
+ set => Item = value;
+ }
public override Type Type => typeof(TCommand);
}
@@ -59,12 +69,32 @@ public class CommandMessage : CommandMessage where TCommand: class, IC
public abstract class EventMessage : Message
{
public override MessageType MessageType => MessageType.Event;
+
+ public void SetEvent(IEvent @event) => Item = @event ?? throw new ArgumentNullException(nameof(@event));
}
- public class EventMessage : EventMessage where TEvent : class, IEvent
+ public sealed class EventMessage : EventMessage where TEvent : class, IEvent
{
- public TEvent Event { get; set; }
+ public TEvent Event
+ {
+ get => Item as TEvent;
+ set => Item = value;
+ }
public override Type Type => typeof(TEvent);
}
+
+ [AttributeUsage(AttributeTargets.Class)]
+ public class MessageAttribute : Attribute
+ {
+ public MessageAttribute(string name, string @namespace)
+ {
+ Name = name ?? throw new ArgumentNullException(nameof(name));
+ Namespace = @namespace ?? throw new ArgumentNullException(nameof(@namespace));
+ }
+
+ public string Name { get; }
+
+ public string Namespace { get; }
+ }
}
diff --git a/src/Nybus.Abstractions/Utils/IMessageDescriptorStore.cs b/src/Nybus.Abstractions/Utils/IMessageDescriptorStore.cs
new file mode 100644
index 0000000..5739c95
--- /dev/null
+++ b/src/Nybus.Abstractions/Utils/IMessageDescriptorStore.cs
@@ -0,0 +1,87 @@
+using System;
+using System.Collections.Generic;
+
+namespace Nybus.Utils
+{
+ public interface IMessageDescriptorStore
+ {
+ bool RegisterType(Type type);
+
+ bool TryGetDescriptorForType(Type type, out MessageDescriptor descriptor);
+
+ bool TryGetTypeForDescriptor(MessageDescriptor descriptor, out Type type);
+ }
+
+ public class MessageDescriptor
+ {
+ public static MessageDescriptor CreateFromType(Type type)
+ {
+ if (type == null)
+ {
+ throw new ArgumentNullException(nameof(type));
+ }
+
+ return new MessageDescriptor(type.Name, type.Namespace);
+ }
+
+ public static MessageDescriptor CreateFromAttribute (MessageAttribute attribute)
+ {
+ if (attribute == null)
+ {
+ throw new ArgumentNullException(nameof(attribute));
+ }
+
+ return new MessageDescriptor(attribute.Name, attribute.Namespace);
+ }
+
+
+ private static readonly char[] Separators = new []{':'};
+
+ public static bool TryParse(string descriptorName, out MessageDescriptor descriptor)
+ {
+ var strings = descriptorName.Split(':');
+
+ if (strings.Length != 2)
+ {
+ descriptor = null;
+ return false;
+ }
+
+ descriptor = new MessageDescriptor(strings[1], strings[0]);
+ return true;
+ }
+
+ public MessageDescriptor(string name, string @namespace)
+ {
+ Name = name ?? throw new ArgumentNullException(nameof(name));
+ Namespace = @namespace ?? throw new ArgumentNullException(nameof(@namespace));
+ }
+
+ public string Name { get; }
+
+ public string Namespace { get; }
+
+ public override string ToString() => $"{Namespace}:{Name}";
+
+ public static implicit operator MessageDescriptor(Type type) => CreateFromType(type);
+
+ public static readonly IEqualityComparer EqualityComparer = new MessageDescriptorEqualityComparer();
+
+ private class MessageDescriptorEqualityComparer : IEqualityComparer
+ {
+ public bool Equals(MessageDescriptor x, MessageDescriptor y)
+ {
+ if (x == null && y == null) return true;
+
+ if ((x != null) != (y != null)) return false;
+
+ return string.Equals(x.ToString(), y.ToString(), StringComparison.Ordinal);
+ }
+
+ public int GetHashCode(MessageDescriptor obj)
+ {
+ return obj.ToString().GetHashCode();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Nybus/NybusConfiguratorExtensions.cs b/src/Nybus/NybusConfiguratorExtensions.cs
index 43f42fb..faabd3d 100644
--- a/src/Nybus/NybusConfiguratorExtensions.cs
+++ b/src/Nybus/NybusConfiguratorExtensions.cs
@@ -7,11 +7,6 @@ namespace Nybus
{
public static class NybusConfiguratorExtensions
{
- public static void UseInMemoryBusEngine(this INybusConfigurator configurator)
- {
- configurator.UseBusEngine();
- }
-
#region Subscribe to Command
public static void SubscribeToCommand(this INybusConfigurator configurator)
diff --git a/src/Nybus/NybusHost.cs b/src/Nybus/NybusHost.cs
index 77ed591..38f89cd 100644
--- a/src/Nybus/NybusHost.cs
+++ b/src/Nybus/NybusHost.cs
@@ -63,11 +63,11 @@ public Task RaiseEventAsync(TEvent @event, Guid correlationId) where TEv
private bool _isStarted;
private IDisposable _disposable;
- public Task StartAsync()
+ public async Task StartAsync()
{
_logger.LogTrace("Bus starting");
- var incomingMessages = _engine.StartAsync().Result;
+ var incomingMessages = await _engine.StartAsync().ConfigureAwait(false);
var observable = from message in incomingMessages
where message != null
@@ -80,23 +80,19 @@ from execution in pipeline(message).ToObservable()
_logger.LogTrace("Bus started");
_isStarted = true;
-
- return Task.CompletedTask;
}
- public Task StopAsync()
+ public async Task StopAsync()
{
if (_isStarted)
{
_logger.LogTrace("Bus stopping");
- _engine.StopAsync().Wait();
+ await _engine.StopAsync().ConfigureAwait(false);
_disposable.Dispose();
_logger.LogTrace("Bus stopped");
}
-
- return Task.CompletedTask;
}
private readonly List _messagePipelines = new List();
diff --git a/src/Nybus/NybusHostBuilder.cs b/src/Nybus/NybusHostBuilder.cs
index 541128f..81b62cf 100644
--- a/src/Nybus/NybusHostBuilder.cs
+++ b/src/Nybus/NybusHostBuilder.cs
@@ -38,7 +38,7 @@ public void SubscribeToCommand(Type commandHandlerType)
_subscriptions.Add(host =>
{
- host.SubscribeToCommand((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandlerAsync(dispatcher,context,commandHandlerType));
+ host.SubscribeToCommand((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandlerAsync(dispatcher, context, commandHandlerType));
});
}
diff --git a/src/Nybus/ServiceCollectionExtensions.cs b/src/Nybus/ServiceCollectionExtensions.cs
index ddaa9b9..0ea7a7d 100644
--- a/src/Nybus/ServiceCollectionExtensions.cs
+++ b/src/Nybus/ServiceCollectionExtensions.cs
@@ -4,6 +4,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Nybus.Policies;
+using Nybus.Utils;
namespace Nybus
{
@@ -49,6 +50,8 @@ public static IServiceCollection AddNybus(this IServiceCollection services, Acti
return options;
});
+ services.AddSingleton();
+
configurator.ConfigureServices(services);
services.AddSingleton(sp =>
diff --git a/src/Nybus/Utils/MessageDescriptorStore.cs b/src/Nybus/Utils/MessageDescriptorStore.cs
new file mode 100644
index 0000000..a6f9ce0
--- /dev/null
+++ b/src/Nybus/Utils/MessageDescriptorStore.cs
@@ -0,0 +1,51 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+
+namespace Nybus.Utils
+{
+ public class MessageDescriptorStore : IMessageDescriptorStore
+ {
+ private readonly IDictionary _descriptorsByType = new Dictionary();
+ private readonly IDictionary _typesByDescriptor = new Dictionary(MessageDescriptor.EqualityComparer);
+
+ private readonly object _lock = new object();
+
+ public bool RegisterType(Type type)
+ {
+ lock (_lock)
+ {
+ if (!_descriptorsByType.ContainsKey(type))
+ {
+ var descriptor = GetDescriptorForType(type);
+
+ if (!_typesByDescriptor.ContainsKey(descriptor))
+ {
+ _descriptorsByType.Add(type, descriptor);
+ _typesByDescriptor.Add(descriptor, type);
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ private MessageDescriptor GetDescriptorForType(Type type)
+ {
+ var attribute = type.GetCustomAttribute();
+
+ if (attribute == null)
+ {
+ return type;
+ }
+
+ return MessageDescriptor.CreateFromAttribute(attribute);
+ }
+
+ public bool TryGetDescriptorForType(Type type, out MessageDescriptor descriptor) => _descriptorsByType.TryGetValue(type, out descriptor);
+
+ public bool TryGetTypeForDescriptor(MessageDescriptor descriptor, out Type type) => _typesByDescriptor.TryGetValue(descriptor, out type);
+ }
+}
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.InMemory/Configuration/ISerializer.cs b/src/engines/Nybus.Engine.InMemory/Configuration/ISerializer.cs
new file mode 100644
index 0000000..d3a9adf
--- /dev/null
+++ b/src/engines/Nybus.Engine.InMemory/Configuration/ISerializer.cs
@@ -0,0 +1,37 @@
+using System;
+using Newtonsoft.Json;
+
+namespace Nybus.Configuration
+{
+ public interface ISerializer
+ {
+ string SerializeObject(object item);
+
+ object DeserializeObject(string item, Type type);
+ }
+
+ public class JsonSerializer : ISerializer
+ {
+ private readonly JsonSerializerSettings _settings;
+
+ public JsonSerializer() : this(new JsonSerializerSettings())
+ {
+
+ }
+
+ public JsonSerializer(JsonSerializerSettings settings)
+ {
+ _settings = settings ?? throw new ArgumentNullException(nameof(settings));
+ }
+
+ public string SerializeObject(object item)
+ {
+ return JsonConvert.SerializeObject(item, _settings);
+ }
+
+ public object DeserializeObject(string item, Type type)
+ {
+ return JsonConvert.DeserializeObject(item, type, _settings);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.InMemory/InMemory/IEnvelopeService.cs b/src/engines/Nybus.Engine.InMemory/InMemory/IEnvelopeService.cs
new file mode 100644
index 0000000..fe1d50d
--- /dev/null
+++ b/src/engines/Nybus.Engine.InMemory/InMemory/IEnvelopeService.cs
@@ -0,0 +1,126 @@
+using System;
+using Nybus.Configuration;
+using Nybus.Utils;
+
+namespace Nybus.InMemory
+{
+ public interface IEnvelopeService
+ {
+ Envelope CreateEnvelope(CommandMessage message) where T : class, ICommand;
+
+ Envelope CreateEnvelope(EventMessage message) where T : class, IEvent;
+
+ CommandMessage CreateCommandMessage(Envelope envelope, Type commandType);
+
+ EventMessage CreateEventMessage(Envelope envelope, Type eventType);
+ }
+
+ public class EnvelopeService : IEnvelopeService
+ {
+ private readonly ISerializer _serializer;
+
+ public EnvelopeService(ISerializer serializer)
+ {
+ _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+ }
+
+ public Envelope CreateEnvelope(CommandMessage message)
+ where T : class, ICommand
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
+ return new Envelope
+ {
+ Headers = message.Headers,
+ Content = _serializer.SerializeObject(message.Command),
+ MessageType = MessageType.Command,
+ MessageId = message.MessageId,
+ Type = message.Type
+ };
+ }
+
+ public Envelope CreateEnvelope(EventMessage message)
+ where T : class, IEvent
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
+ return new Envelope
+ {
+ Headers = message.Headers,
+ Content = _serializer.SerializeObject(message.Event),
+ MessageType = MessageType.Event,
+ MessageId = message.MessageId,
+ Type = message.Type
+ };
+ }
+
+ public CommandMessage CreateCommandMessage(Envelope envelope, Type commandType)
+ {
+ if (envelope == null)
+ {
+ throw new ArgumentNullException(nameof(envelope));
+ }
+
+ if (commandType == null)
+ {
+ throw new ArgumentNullException(nameof(commandType));
+ }
+
+ var command = _serializer.DeserializeObject(envelope.Content, commandType) as ICommand;
+
+ var commandMessageType = typeof(CommandMessage<>).MakeGenericType(commandType);
+ var commandMessage = (CommandMessage)Activator.CreateInstance(commandMessageType);
+
+ commandMessage.SetCommand(command);
+ commandMessage.Headers = envelope.Headers;
+ commandMessage.MessageId = envelope.MessageId;
+
+ return commandMessage;
+ }
+
+ public EventMessage CreateEventMessage(Envelope envelope, Type eventType)
+ {
+ if (envelope == null)
+ {
+ throw new ArgumentNullException(nameof(envelope));
+ }
+
+ if (eventType == null)
+ {
+ throw new ArgumentNullException(nameof(eventType));
+ }
+
+ var @event = _serializer.DeserializeObject(envelope.Content, eventType) as IEvent;
+
+ var eventMessageType = typeof(EventMessage<>).MakeGenericType(eventType);
+ var eventMessage = (EventMessage)Activator.CreateInstance(eventMessageType);
+
+ eventMessage.SetEvent(@event);
+ eventMessage.Headers = envelope.Headers;
+ eventMessage.MessageId = envelope.MessageId;
+
+ return eventMessage;
+ }
+ }
+
+ public class Envelope
+ {
+ public string MessageId { get; set; }
+
+ public HeaderBag Headers { get; set; }
+
+ public MessageType MessageType { get; set; }
+
+ public Type Type { get; set; }
+
+ public MessageDescriptor Descriptor => MessageDescriptor.CreateFromType(Type);
+
+ public string Content { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Nybus/InMemoryBusEngine.cs b/src/engines/Nybus.Engine.InMemory/InMemory/InMemoryBusEngine.cs
similarity index 53%
rename from src/Nybus/InMemoryBusEngine.cs
rename to src/engines/Nybus.Engine.InMemory/InMemory/InMemoryBusEngine.cs
index cdcc573..e539b4b 100644
--- a/src/Nybus/InMemoryBusEngine.cs
+++ b/src/engines/Nybus.Engine.InMemory/InMemory/InMemoryBusEngine.cs
@@ -3,37 +3,81 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
+using Nybus.Utils;
+
// ReSharper disable InvokeAsExtensionMethod
-namespace Nybus
+namespace Nybus.InMemory
{
public class InMemoryBusEngine : IBusEngine
{
- private ISubject _sequenceOfMessages;
+ private readonly IMessageDescriptorStore _messageDescriptorStore;
+ private readonly IEnvelopeService _envelopeService;
+ private ISubject _sequenceOfMessages;
private bool _isStarted;
private readonly ISet _acceptedTypes = new HashSet();
+ public InMemoryBusEngine(IMessageDescriptorStore messageDescriptorStore, IEnvelopeService envelopeService)
+ {
+ _messageDescriptorStore = messageDescriptorStore ?? throw new ArgumentNullException(nameof(messageDescriptorStore));
+ _envelopeService = envelopeService ?? throw new ArgumentNullException(nameof(envelopeService));
+ }
+
public Task> StartAsync()
{
- _sequenceOfMessages = new Subject();
+ _sequenceOfMessages = new Subject();
var commands = _sequenceOfMessages
.Where(m => m != null)
.Where(m => m.MessageType == MessageType.Command)
- .Cast()
- .Where(m => _acceptedTypes.Contains(m.Type))
+ .Select(GetCommandMessage)
+ .Where(m => m != null)
.Cast();
var events = _sequenceOfMessages
.Where(m => m != null)
.Where(m => m.MessageType == MessageType.Event)
- .Cast()
- .Where(m => _acceptedTypes.Contains(m.Type))
+ .Select(GetEventMessage)
+ .Where(m => m != null)
.Cast();
_isStarted = true;
return Task.FromResult(Observable.Merge(commands, events));
+
+ CommandMessage GetCommandMessage(Envelope incoming)
+ {
+ var incomingType = incoming.Type;
+
+ if (_acceptedTypes.Contains(incomingType))
+ {
+ return _envelopeService.CreateCommandMessage(incoming, incomingType);
+ }
+
+ if (_messageDescriptorStore.TryGetTypeForDescriptor(incoming.Descriptor, out var outgoingType))
+ {
+ return _envelopeService.CreateCommandMessage(incoming, outgoingType);
+ }
+
+ return null;
+ }
+
+ EventMessage GetEventMessage(Envelope incoming)
+ {
+ var incomingType = incoming.Type;
+
+ if (_acceptedTypes.Contains(incomingType))
+ {
+ return _envelopeService.CreateEventMessage(incoming, incomingType);
+ }
+
+ if (_messageDescriptorStore.TryGetTypeForDescriptor(incoming.Descriptor, out var outgoingType))
+ {
+ return _envelopeService.CreateEventMessage(incoming, outgoingType);
+ }
+
+ return null;
+ }
}
public Task StopAsync()
@@ -51,7 +95,8 @@ public Task SendCommandAsync(CommandMessage message) where T
{
if (_isStarted)
{
- _sequenceOfMessages.OnNext(message);
+ var envelope = _envelopeService.CreateEnvelope(message);
+ _sequenceOfMessages.OnNext(envelope);
}
return Task.CompletedTask;
@@ -61,7 +106,8 @@ public Task SendEventAsync(EventMessage message) where TEvent :
{
if (_isStarted)
{
- _sequenceOfMessages.OnNext(message);
+ var envelope = _envelopeService.CreateEnvelope(message);
+ _sequenceOfMessages.OnNext(envelope);
}
return Task.CompletedTask;
@@ -69,11 +115,13 @@ public Task SendEventAsync(EventMessage message) where TEvent :
public void SubscribeToCommand() where TCommand : class, ICommand
{
+ _messageDescriptorStore.RegisterType(typeof(TCommand));
_acceptedTypes.Add(typeof(TCommand));
}
public void SubscribeToEvent() where TEvent : class, IEvent
{
+ _messageDescriptorStore.RegisterType(typeof(TEvent));
_acceptedTypes.Add(typeof(TEvent));
}
diff --git a/src/engines/Nybus.Engine.InMemory/InMemoryConfiguratorExtensions.cs b/src/engines/Nybus.Engine.InMemory/InMemoryConfiguratorExtensions.cs
new file mode 100644
index 0000000..e84999c
--- /dev/null
+++ b/src/engines/Nybus.Engine.InMemory/InMemoryConfiguratorExtensions.cs
@@ -0,0 +1,18 @@
+using Microsoft.Extensions.DependencyInjection;
+using Nybus.Configuration;
+using Nybus.InMemory;
+
+namespace Nybus
+{
+ public static class InMemoryConfiguratorExtensions
+ {
+ public static void UseInMemoryBusEngine(this INybusConfigurator configurator)
+ {
+ configurator.AddServiceConfiguration(svc => svc.AddSingleton());
+
+ configurator.AddServiceConfiguration(svc => svc.AddSingleton());
+
+ configurator.UseBusEngine();
+ }
+ }
+}
diff --git a/src/engines/Nybus.Engine.InMemory/Nybus.Engine.InMemory.csproj b/src/engines/Nybus.Engine.InMemory/Nybus.Engine.InMemory.csproj
new file mode 100644
index 0000000..de09254
--- /dev/null
+++ b/src/engines/Nybus.Engine.InMemory/Nybus.Engine.InMemory.csproj
@@ -0,0 +1,20 @@
+
+
+
+ netstandard2.0
+ Nybus
+ Nybus.Engine.InMemory
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/ISerializer.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/ISerializer.cs
index b9e388a..2abed5e 100644
--- a/src/engines/Nybus.Engine.RabbitMq/Configuration/ISerializer.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/ISerializer.cs
@@ -37,6 +37,4 @@ public object DeserializeObject(byte[] bytes, Type type, Encoding encoding)
return JsonConvert.DeserializeObject(json, type, _settings);
}
}
-
-
}
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.RabbitMq/ObservableConsumer.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumer.cs
similarity index 99%
rename from src/engines/Nybus.Engine.RabbitMq/ObservableConsumer.cs
rename to src/engines/Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumer.cs
index fdfeb08..e750015 100644
--- a/src/engines/Nybus.Engine.RabbitMq/ObservableConsumer.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumer.cs
@@ -5,7 +5,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
-namespace Nybus
+namespace Nybus.RabbitMq
{
public class ObservableConsumer : IBasicConsumer, IObservable
{
diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
similarity index 82%
rename from src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs
rename to src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
index 7eb94a4..4c69a23 100644
--- a/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
@@ -11,7 +11,7 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
-namespace Nybus
+namespace Nybus.RabbitMq
{
public class RabbitMqBusEngine : IBusEngine
{
@@ -20,10 +20,12 @@ public class RabbitMqBusEngine : IBusEngine
private readonly ILogger _logger;
private readonly Dictionary _consumers = new Dictionary(StringComparer.OrdinalIgnoreCase);
private readonly IRabbitMqConfiguration _configuration;
+ private readonly IMessageDescriptorStore _messageDescriptorStore;
- public RabbitMqBusEngine(IRabbitMqConfiguration configuration, ILogger logger)
+ public RabbitMqBusEngine(IRabbitMqConfiguration configuration, IMessageDescriptorStore messageDescriptorStore, ILogger logger)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+ _messageDescriptorStore = messageDescriptorStore ?? throw new ArgumentNullException(nameof(messageDescriptorStore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -112,20 +114,25 @@ Message GetMessage(BasicDeliverEventArgs args)
Message message = null;
- if (TryFindCommandByName(messageTypeName, out var commandType))
+ if (!MessageDescriptor.TryParse(messageTypeName, out var descriptor))
+ {
+ NackMessage(args.DeliveryTag);
+ return null;
+ }
+
+ if (TryFindCommandByName(descriptor, out var commandType) || _messageDescriptorStore.TryGetTypeForDescriptor(descriptor, out commandType))
{
var command = _configuration.Serializer.DeserializeObject(args.Body, commandType, encoding) as ICommand;
message = CreateCommandMessage(command);
}
- else if (TryFindEventByName(messageTypeName, out var eventType))
+ else if (TryFindEventByName(descriptor, out var eventType) || _messageDescriptorStore.TryGetTypeForDescriptor(descriptor, out eventType))
{
var @event = _configuration.Serializer.DeserializeObject(args.Body, eventType, encoding) as IEvent;
message = CreateEventMessage(@event);
}
else
{
- _channel.BasicNack(args.DeliveryTag, false, true);
- _processingMessages.TryRemoveItem(args.DeliveryTag);
+ NackMessage(args.DeliveryTag);
return null;
}
@@ -144,35 +151,31 @@ Message GetMessage(BasicDeliverEventArgs args)
CommandMessage CreateCommandMessage(ICommand command)
{
- const string propertyName = "Command";
-
var messageType = typeof(CommandMessage<>).MakeGenericType(command.GetType());
- var message = Activator.CreateInstance(messageType);
+ var message = Activator.CreateInstance(messageType) as CommandMessage;
- messageType.GetProperty(propertyName).SetValue(message, command);
+ message?.SetCommand(command);
- return message as CommandMessage;
+ return message;
}
EventMessage CreateEventMessage(IEvent @event)
{
- const string propertyName = "Event";
-
var messageType = typeof(EventMessage<>).MakeGenericType(@event.GetType());
- var message = Activator.CreateInstance(messageType);
+ var message = Activator.CreateInstance(messageType) as EventMessage;
- messageType.GetProperty(propertyName).SetValue(message, @event);
+ message?.SetEvent(@event);
- return message as EventMessage;
+ return message;
}
- bool TryFindCommandByName(string commandTypeName, out Type type) => TryFindTypeByName(AcceptedCommandTypes, commandTypeName, out type);
+ bool TryFindCommandByName(MessageDescriptor descriptor, out Type type) => TryFindTypeByName(AcceptedCommandTypes, descriptor, out type);
- bool TryFindEventByName(string eventTypeName, out Type type) => TryFindTypeByName(AcceptedEventTypes, eventTypeName, out type);
+ bool TryFindEventByName(MessageDescriptor descriptor, out Type type) => TryFindTypeByName(AcceptedEventTypes, descriptor, out type);
- bool TryFindTypeByName(ISet typeList, string typeName, out Type type)
+ bool TryFindTypeByName(ISet typeList, MessageDescriptor descriptor, out Type type)
{
- type = typeList.FirstOrDefault(o => o.FullName == typeName);
+ type = typeList.FirstOrDefault(o => string.Equals(o.Name, descriptor.Name, StringComparison.OrdinalIgnoreCase) && string.Equals(o.Namespace, descriptor.Namespace, StringComparison.OrdinalIgnoreCase));
return type != null;
}
@@ -213,7 +216,7 @@ private Task SendItemAsync(Message message, T item)
properties.Headers = new Dictionary
{
[Nybus(Headers.MessageId)] = message.MessageId,
- [Nybus(Headers.MessageType)] = type.FullName
+ [Nybus(Headers.MessageType)] = message.Descriptor.ToString()
};
foreach (var header in message.Headers)
@@ -234,12 +237,14 @@ private Task SendItemAsync(Message message, T item)
public void SubscribeToCommand()
where TCommand : class, ICommand
{
+ _messageDescriptorStore.RegisterType(typeof(TCommand));
AcceptedCommandTypes.Add(typeof(TCommand));
}
public void SubscribeToEvent()
where TEvent : class, IEvent
{
+ _messageDescriptorStore.RegisterType(typeof(TEvent));
AcceptedEventTypes.Add(typeof(TEvent));
}
@@ -273,8 +278,7 @@ public Task NotifyFailAsync(Message message)
{
try
{
- _channel.BasicNack(deliveryTag, false, true);
- _processingMessages.TryRemoveItem(deliveryTag);
+ NackMessage(deliveryTag);
}
catch (AlreadyClosedException ex)
{
@@ -291,7 +295,13 @@ public Task NotifyFailAsync(Message message)
return Task.CompletedTask;
}
- private static string GetExchangeNameForType(Type type) => type.FullName;
+ private void NackMessage(ulong deliveryTag)
+ {
+ _channel.BasicNack(deliveryTag, false, true);
+ _processingMessages.TryRemoveItem(deliveryTag);
+ }
+
+ private static string GetExchangeNameForType(Type type) => type.FullName; //$"{type.Namespace}:{type.Name}";
private static string Nybus(string key) => $"Nybus:{key}";
}
diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMqHeaders.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqHeaders.cs
similarity index 87%
rename from src/engines/Nybus.Engine.RabbitMq/RabbitMqHeaders.cs
rename to src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqHeaders.cs
index 6c5745e..d994e50 100644
--- a/src/engines/Nybus.Engine.RabbitMq/RabbitMqHeaders.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqHeaders.cs
@@ -1,4 +1,4 @@
-namespace Nybus
+namespace Nybus.RabbitMq
{
public static class RabbitMqHeaders
{
diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensions.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensions.cs
index 6d82bc8..dd56b1d 100644
--- a/src/engines/Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensions.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensions.cs
@@ -1,6 +1,7 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Nybus.Configuration;
+using Nybus.RabbitMq;
namespace Nybus
{
diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props
index 0f6d197..5fad0f4 100644
--- a/tests/Directory.Build.props
+++ b/tests/Directory.Build.props
@@ -7,6 +7,7 @@
+
diff --git a/tests/TestUtils/Commands.cs b/tests/TestUtils/Commands.cs
index dde9ebf..f8cd11b 100644
--- a/tests/TestUtils/Commands.cs
+++ b/tests/TestUtils/Commands.cs
@@ -35,4 +35,19 @@ public virtual Task HandleAsync(IDispatcher dispatcher, ICommandContext>();
+
+ services.AddLogging(l => l.AddDebug());
+
+ services.AddNybus(nybus =>
+ {
+ nybus.UseInMemoryBusEngine();
+
+ nybus.SubscribeToCommand(commandReceived);
+ });
+
+ var serviceProvider = services.BuildServiceProvider();
+
+ var host = serviceProvider.GetRequiredService();
+
+ var bus = serviceProvider.GetRequiredService();
+
+ await host.StartAsync();
+
+ await bus.InvokeCommandAsync(testCommand);
+
+ await host.StopAsync();
+
+ Mock.Get(commandReceived).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Once);
+ }
+
+ [Test, AutoMoqData]
+ public async Task Events_are_matched_via_MessageAttribute(ServiceCollection services, ThirdTestEvent testEvent)
+ {
+ var eventReceived = Mock.Of>();
+
+ services.AddLogging(l => l.AddDebug());
+
+ services.AddNybus(nybus =>
+ {
+ nybus.UseInMemoryBusEngine();
+
+ nybus.SubscribeToEvent(eventReceived);
+ });
+
+ var serviceProvider = services.BuildServiceProvider();
+
+ var host = serviceProvider.GetRequiredService();
+
+ var bus = serviceProvider.GetRequiredService();
+
+ await host.StartAsync();
+
+ await bus.RaiseEventAsync(testEvent);
+
+ await host.StopAsync();
+
+ Mock.Get(eventReceived).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Once);
+ }
+
+ [Test, AutoMoqData]
+ public async Task Commands_are_correctly_converted(ServiceCollection services, ThirdTestCommand testCommand)
+ {
+ var commandReceived = Mock.Of>();
+
+ services.AddLogging(l => l.AddDebug());
+
+ services.AddNybus(nybus =>
+ {
+ nybus.UseInMemoryBusEngine();
+
+ nybus.SubscribeToCommand(commandReceived);
+ });
+
+ var serviceProvider = services.BuildServiceProvider();
+
+ var host = serviceProvider.GetRequiredService();
+
+ var bus = serviceProvider.GetRequiredService();
+
+ await host.StartAsync();
+
+ await bus.InvokeCommandAsync(testCommand);
+
+ await host.StopAsync();
+
+ Mock.Get(commandReceived).Verify(p => p(It.IsAny(), It.Is>(c => string.Equals(c.Command.Message, testCommand.Message))), Times.Once);
+ }
+
+ [Test, AutoMoqData]
+ public async Task Events_are_correctly_converted(ServiceCollection services, ThirdTestEvent testEvent)
+ {
+ var eventReceived = Mock.Of>();
+
+ services.AddLogging(l => l.AddDebug());
+
+ services.AddNybus(nybus =>
+ {
+ nybus.UseInMemoryBusEngine();
+
+ nybus.SubscribeToEvent(eventReceived);
+ });
+
+ var serviceProvider = services.BuildServiceProvider();
+
+ var host = serviceProvider.GetRequiredService();
+
+ var bus = serviceProvider.GetRequiredService();
+
+ await host.StartAsync();
+
+ await bus.RaiseEventAsync(testEvent);
+
+ await host.StopAsync();
+
+ Mock.Get(eventReceived).Verify(p => p(It.IsAny(), It.Is>(e => string.Equals(e.Event.Message, testEvent.Message))), Times.Once);
+ }
+ }
+}
diff --git a/tests/Tests.Integration.Nybus/RegisteredHandlerBareSetupTests.cs b/tests/Tests.Integration.Nybus.Engine.InMemory/RegisteredHandlerBareSetupTests.cs
similarity index 100%
rename from tests/Tests.Integration.Nybus/RegisteredHandlerBareSetupTests.cs
rename to tests/Tests.Integration.Nybus.Engine.InMemory/RegisteredHandlerBareSetupTests.cs
diff --git a/tests/Tests.Integration.Nybus/RegisteredHandlerConfigurationSetupTests.cs b/tests/Tests.Integration.Nybus.Engine.InMemory/RegisteredHandlerConfigurationSetupTests.cs
similarity index 100%
rename from tests/Tests.Integration.Nybus/RegisteredHandlerConfigurationSetupTests.cs
rename to tests/Tests.Integration.Nybus.Engine.InMemory/RegisteredHandlerConfigurationSetupTests.cs
diff --git a/tests/Tests.Integration.Nybus/SingletonHandlerBareSetupTests.cs b/tests/Tests.Integration.Nybus.Engine.InMemory/SingletonHandlerBareSetupTests.cs
similarity index 100%
rename from tests/Tests.Integration.Nybus/SingletonHandlerBareSetupTests.cs
rename to tests/Tests.Integration.Nybus.Engine.InMemory/SingletonHandlerBareSetupTests.cs
diff --git a/tests/Tests.Integration.Nybus/Tests.Integration.Nybus.csproj b/tests/Tests.Integration.Nybus.Engine.InMemory/Tests.Integration.Nybus.Engine.InMemory.csproj
similarity index 87%
rename from tests/Tests.Integration.Nybus/Tests.Integration.Nybus.csproj
rename to tests/Tests.Integration.Nybus.Engine.InMemory/Tests.Integration.Nybus.Engine.InMemory.csproj
index 2f34fbe..a494b36 100644
--- a/tests/Tests.Integration.Nybus/Tests.Integration.Nybus.csproj
+++ b/tests/Tests.Integration.Nybus.Engine.InMemory/Tests.Integration.Nybus.Engine.InMemory.csproj
@@ -15,6 +15,7 @@
+
diff --git a/tests/Tests.Nybus.Abstractions/AutoMoqDataAttribute.cs b/tests/Tests.Nybus.Abstractions/AutoMoqDataAttribute.cs
index 06b228b..9aa437b 100644
--- a/tests/Tests.Nybus.Abstractions/AutoMoqDataAttribute.cs
+++ b/tests/Tests.Nybus.Abstractions/AutoMoqDataAttribute.cs
@@ -1,9 +1,7 @@
using AutoFixture;
using AutoFixture.AutoMoq;
-using AutoFixture.Kernel;
using AutoFixture.NUnit3;
-using Nybus;
-using Nybus.Configuration;
+using Nybus.Utils;
namespace Tests
{
@@ -24,6 +22,8 @@ private static IFixture CreateFixture()
GenerateDelegates = true
});
+ fixture.Register(() => MessageDescriptor.EqualityComparer);
+
return fixture;
}
}
diff --git a/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorEqualityComparerTests.cs b/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorEqualityComparerTests.cs
new file mode 100644
index 0000000..0d78bc1
--- /dev/null
+++ b/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorEqualityComparerTests.cs
@@ -0,0 +1,63 @@
+using System.Collections.Generic;
+using AutoFixture.NUnit3;
+using NUnit.Framework;
+using Nybus.Utils;
+
+namespace Tests.Utils
+{
+ [TestFixture]
+ public class MessageDescriptorEqualityComparerTests
+ {
+ [Test, AutoMoqData]
+ public void Item_is_equal_to_itself(IEqualityComparer sut, MessageDescriptor descriptor)
+ {
+ Assert.That(sut.Equals(descriptor, descriptor), Is.True);
+ }
+
+ [Test, AutoMoqData]
+ public void Null_is_equal_to_itself(IEqualityComparer sut)
+ {
+ Assert.That(sut.Equals(null, null), Is.True);
+ }
+
+ [Test, AutoMoqData]
+ public void Items_with_same_values_are_equal(
+ IEqualityComparer sut,
+ [Frozen(Matching.ParameterName)] string name, // must have the same name as the MessageDescriptor.ctor parameter
+ [Frozen(Matching.ParameterName)] string @namespace, // must have the same name as the MessageDescriptor.ctor parameter
+ MessageDescriptor first,
+ MessageDescriptor second)
+ {
+ Assume.That(first.Name, Is.EqualTo(second.Name));
+ Assume.That(first.Namespace, Is.EqualTo(second.Namespace));
+
+ Assert.That(sut.Equals(first, second), Is.True);
+ }
+
+ [Test, AutoMoqData]
+ public void Item_is_not_equal_to_null(IEqualityComparer sut, MessageDescriptor descriptor)
+ {
+ Assert.That(sut.Equals(descriptor, null), Is.False);
+ }
+
+ [Test, AutoMoqData]
+ public void Null_is_not_equal_to_any_item(IEqualityComparer sut, MessageDescriptor descriptor)
+ {
+ Assert.That(sut.Equals(null, descriptor), Is.False);
+ }
+
+ [Test, AutoMoqData]
+ public void Items_with_same_values_have_same_hashcode(
+ IEqualityComparer sut,
+ [Frozen(Matching.ParameterName)] string name, // must have the same name as the MessageDescriptor.ctor parameter
+ [Frozen(Matching.ParameterName)] string @namespace, // must have the same name as the MessageDescriptor.ctor parameter
+ MessageDescriptor first,
+ MessageDescriptor second)
+ {
+ Assume.That(first.Name, Is.EqualTo(second.Name));
+ Assume.That(first.Namespace, Is.EqualTo(second.Namespace));
+
+ Assert.That(sut.GetHashCode(first), Is.EqualTo(sut.GetHashCode(second)));
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorTests.cs b/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorTests.cs
new file mode 100644
index 0000000..ec62219
--- /dev/null
+++ b/tests/Tests.Nybus.Abstractions/Utils/MessageDescriptorTests.cs
@@ -0,0 +1,31 @@
+using System;
+using NUnit.Framework;
+using Nybus.Utils;
+
+namespace Tests.Utils
+{
+ [TestFixture]
+ public class MessageDescriptorTests
+ {
+ [Test]
+ public void CreateFromType_requires_type()
+ {
+ Assert.Throws(() => MessageDescriptor.CreateFromType(null));
+ }
+
+ [Test]
+ public void CreateFromAttribute_requires_attribute()
+ {
+ Assert.Throws(() => MessageDescriptor.CreateFromAttribute(null));
+ }
+
+ [Test, AutoMoqData]
+ public void TryParse_requires_correct_format(string descriptorName)
+ {
+ var result = MessageDescriptor.TryParse(descriptorName, out var descriptor);
+
+ Assert.That(result, Is.False);
+ Assert.That(descriptor, Is.Null);
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Tests.Nybus.Engine.InMemory/AutoMoqDataAttribute.cs b/tests/Tests.Nybus.Engine.InMemory/AutoMoqDataAttribute.cs
new file mode 100644
index 0000000..fc55c33
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.InMemory/AutoMoqDataAttribute.cs
@@ -0,0 +1,51 @@
+using System;
+using AutoFixture;
+using AutoFixture.AutoMoq;
+using AutoFixture.NUnit3;
+
+namespace Tests
+{
+ public class AutoMoqDataAttribute : AutoDataAttribute
+ {
+ public AutoMoqDataAttribute() : base(CreateFixture)
+ {
+
+ }
+
+ private static IFixture CreateFixture()
+ {
+ IFixture fixture = new Fixture();
+
+ fixture.Customize(new AutoMoqCustomization
+ {
+ ConfigureMembers = true,
+ GenerateDelegates = true
+ });
+
+ fixture.Freeze();
+
+ return fixture;
+ }
+ }
+
+ public class InlineAutoMoqDataAttribute : InlineAutoDataAttribute
+ {
+ public InlineAutoMoqDataAttribute(params object[] args) : base(CreateFixture, args)
+ {
+
+ }
+
+ private static IFixture CreateFixture()
+ {
+ IFixture fixture = new Fixture();
+
+ fixture.Customize(new AutoMoqCustomization
+ {
+ ConfigureMembers = true,
+ GenerateDelegates = true
+ });
+
+ return fixture;
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Tests.Nybus.Engine.InMemory/InMemory/EnvelopeServiceTests.cs b/tests/Tests.Nybus.Engine.InMemory/InMemory/EnvelopeServiceTests.cs
new file mode 100644
index 0000000..6f9314e
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.InMemory/InMemory/EnvelopeServiceTests.cs
@@ -0,0 +1,126 @@
+using System;
+using AutoFixture.NUnit3;
+using Moq;
+using NUnit.Framework;
+using Nybus;
+using Nybus.Configuration;
+using Nybus.InMemory;
+
+namespace Tests.InMemory
+{
+ [TestFixture]
+ public class EnvelopeServiceTests
+ {
+ [Test, AutoMoqData]
+ public void Create_from_CommandMessage_requires_valid_message(EnvelopeService sut)
+ {
+ Assert.Throws(() => sut.CreateEnvelope((CommandMessage)null));
+ }
+
+ [Test, AutoMoqData]
+ public void Create_from_EventMessage_requires_valid_message(EnvelopeService sut)
+ {
+ Assert.Throws(() => sut.CreateEnvelope((EventMessage)null));
+ }
+
+ [Test, AutoMoqData]
+ public void Create_from_CommandMessage_can_create_envelope(EnvelopeService sut, CommandMessage testMessage)
+ {
+ var envelope = sut.CreateEnvelope(testMessage);
+
+ Assert.That(envelope, Is.Not.Null);
+ Assert.That(envelope.Type, Is.EqualTo(testMessage.Type));
+ Assert.That(envelope.Headers, Is.SameAs(testMessage.Headers));
+ Assert.That(envelope.MessageId, Is.EqualTo(testMessage.MessageId));
+ Assert.That(envelope.MessageType, Is.EqualTo(testMessage.MessageType));
+ }
+
+ [Test, AutoMoqData]
+ public void Create_from_EventMessage_can_create_envelope(EnvelopeService sut, EventMessage testMessage)
+ {
+ var envelope = sut.CreateEnvelope(testMessage);
+
+ Assert.That(envelope, Is.Not.Null);
+ Assert.That(envelope.Type, Is.EqualTo(testMessage.Type));
+ Assert.That(envelope.Headers, Is.SameAs(testMessage.Headers));
+ Assert.That(envelope.MessageId, Is.EqualTo(testMessage.MessageId));
+ Assert.That(envelope.MessageType, Is.EqualTo(testMessage.MessageType));
+ }
+
+ [Test, AutoMoqData]
+ public void Create_from_CommandMessage_uses_serializer([Frozen] ISerializer serializer, EnvelopeService sut, CommandMessage testMessage)
+ {
+ var envelope = sut.CreateEnvelope(testMessage);
+
+ Assert.That(envelope, Is.Not.Null);
+
+ Mock.Get(serializer).Verify(p => p.SerializeObject(testMessage.Command), Times.Once);
+ }
+
+ [Test, AutoMoqData]
+ public void Create_from_EventMessage_uses_serializer([Frozen] ISerializer serializer, EnvelopeService sut, EventMessage testMessage)
+ {
+ var envelope = sut.CreateEnvelope(testMessage);
+
+ Assert.That(envelope, Is.Not.Null);
+
+ Mock.Get(serializer).Verify(p => p.SerializeObject(testMessage.Event), Times.Once);
+ }
+
+ [Test, AutoMoqData]
+ public void CreateCommandMessage_requires_valid_envelope(EnvelopeService sut, Type type)
+ {
+ Assert.Throws(() => sut.CreateCommandMessage(null, type));
+ }
+
+ [Test, AutoMoqData]
+ public void CreateCommandMessage_requires_valid_commandType(EnvelopeService sut, Envelope envelope)
+ {
+ Assert.Throws(() => sut.CreateCommandMessage(envelope, null));
+ }
+
+ [Test, AutoMoqData]
+ public void CreateCommandMessage_returns_message_from_Envelope([Frozen] ISerializer serializer, EnvelopeService sut, Envelope envelope, FirstTestCommand testCommand)
+ {
+ envelope.MessageType = MessageType.Command;
+
+ Mock.Get(serializer).Setup(p => p.DeserializeObject(It.IsAny(), typeof(FirstTestCommand))).Returns(testCommand);
+
+ var commandMessage = sut.CreateCommandMessage(envelope, typeof(FirstTestCommand)) as CommandMessage;
+
+ Assert.That(commandMessage, Is.Not.Null);
+ Assert.That(commandMessage.Command, Is.SameAs(testCommand));
+ Assert.That(commandMessage.Headers, Is.SameAs(envelope.Headers));
+ Assert.That(commandMessage.MessageId, Is.EqualTo(envelope.MessageId));
+ Assert.That(commandMessage.MessageType, Is.EqualTo(envelope.MessageType));
+ }
+
+ [Test, AutoMoqData]
+ public void CreateEventMessage_requires_valid_envelope(EnvelopeService sut, Type type)
+ {
+ Assert.Throws(() => sut.CreateEventMessage(null, type));
+ }
+
+ [Test, AutoMoqData]
+ public void CreateEventMessage_requires_valid_eventType(EnvelopeService sut, Envelope envelope)
+ {
+ Assert.Throws(() => sut.CreateEventMessage(envelope, null));
+ }
+
+ [Test, AutoMoqData]
+ public void CreateEventMessage_returns_message_from_Envelope([Frozen] ISerializer serializer, EnvelopeService sut, Envelope envelope, FirstTestEvent testEvent)
+ {
+ envelope.MessageType = MessageType.Event;
+
+ Mock.Get(serializer).Setup(p => p.DeserializeObject(It.IsAny(), typeof(FirstTestEvent))).Returns(testEvent);
+
+ var eventMessage = sut.CreateEventMessage(envelope, typeof(FirstTestEvent)) as EventMessage;
+
+ Assert.That(eventMessage, Is.Not.Null);
+ Assert.That(eventMessage.Event, Is.SameAs(testEvent));
+ Assert.That(eventMessage.Headers, Is.SameAs(envelope.Headers));
+ Assert.That(eventMessage.MessageId, Is.EqualTo(envelope.MessageId));
+ Assert.That(eventMessage.MessageType, Is.EqualTo(envelope.MessageType));
+ }
+ }
+}
diff --git a/tests/Tests.Nybus/InMemoryBusEngineTests.cs b/tests/Tests.Nybus.Engine.InMemory/InMemory/InMemoryBusEngineTests.cs
similarity index 64%
rename from tests/Tests.Nybus/InMemoryBusEngineTests.cs
rename to tests/Tests.Nybus.Engine.InMemory/InMemory/InMemoryBusEngineTests.cs
index 0174c04..abee7c7 100644
--- a/tests/Tests.Nybus/InMemoryBusEngineTests.cs
+++ b/tests/Tests.Nybus.Engine.InMemory/InMemory/InMemoryBusEngineTests.cs
@@ -1,16 +1,26 @@
using System;
using System.Linq;
using System.Threading.Tasks;
+using AutoFixture;
+using AutoFixture.AutoMoq;
+using AutoFixture.Idioms;
using AutoFixture.NUnit3;
using Moq;
using NUnit.Framework;
using Nybus;
+using Nybus.InMemory;
-namespace Tests
+namespace Tests.InMemory
{
[TestFixture]
public class InMemoryBusEngineTests
{
+ [Test, AutoMoqData]
+ public void Constructor_is_guarded(GuardClauseAssertion assertion)
+ {
+ assertion.Verify(typeof(InMemoryBusEngine).GetConstructors());
+ }
+
[Test, AutoMoqData]
public void SubscribeToCommand_adds_type_to_AcceptedTypes_list(InMemoryBusEngine sut)
{
@@ -28,8 +38,19 @@ public void SubscribeToEvent_adds_type_AcceptedTypes_list(InMemoryBusEngine sut)
}
[Test, AutoMoqData]
- public async Task Sent_commands_are_received(InMemoryBusEngine sut, CommandMessage testMessage)
- {
+ public async Task Sent_commands_are_received([Frozen] IEnvelopeService envelopeService, InMemoryBusEngine sut, CommandMessage testMessage, IFixture fixture)
+ {
+ fixture.Customize(c => c
+ .With(p => p.Type, testMessage.Type)
+ .With(p => p.Headers, testMessage.Headers)
+ .With(p => p.Content)
+ .With(p => p.MessageId, testMessage.MessageId)
+ .With(p => p.MessageType, testMessage.MessageType)
+ );
+
+ Mock.Get(envelopeService).Setup(p => p.CreateEnvelope(It.IsAny>())).ReturnsUsingFixture(fixture);
+ Mock.Get(envelopeService).Setup(p => p.CreateCommandMessage(It.IsAny(), It.IsAny())).Returns(testMessage);
+
sut.SubscribeToCommand();
var sequence = await sut.StartAsync().ConfigureAwait(false);
@@ -37,13 +58,24 @@ public async Task Sent_commands_are_received(InMemoryBusEngine sut, CommandMessa
var items = sequence.DumpInList();
await sut.SendCommandAsync(testMessage);
-
- Assert.That(items.First(), Is.SameAs(testMessage));
+
+ Assert.That(items.First(), Is.EqualTo(testMessage).Using>((x, y) => x.MessageId == y.MessageId));
}
[Test, AutoMoqData]
- public async Task Sent_events_are_received(InMemoryBusEngine sut, EventMessage testMessage)
- {
+ public async Task Sent_events_are_received([Frozen] IEnvelopeService envelopeService, InMemoryBusEngine sut, EventMessage testMessage, IFixture fixture)
+ {
+ fixture.Customize(c => c
+ .With(p => p.Type, testMessage.Type)
+ .With(p => p.Headers, testMessage.Headers)
+ .With(p => p.Content)
+ .With(p => p.MessageId, testMessage.MessageId)
+ .With(p => p.MessageType, testMessage.MessageType)
+ );
+
+ Mock.Get(envelopeService).Setup(p => p.CreateEnvelope(It.IsAny>())).ReturnsUsingFixture(fixture);
+ Mock.Get(envelopeService).Setup(p => p.CreateEventMessage(It.IsAny(), It.IsAny())).Returns(testMessage);
+
sut.SubscribeToEvent();
var sequence = await sut.StartAsync().ConfigureAwait(false);
@@ -52,13 +84,13 @@ public async Task Sent_events_are_received(InMemoryBusEngine sut, EventMessage>((x, y) => x.MessageId == y.MessageId));
}
[Test, AutoMoqData]
- public void Stop_completes_the_sequence_if_started(InMemoryBusEngine sut)
+ public async Task Stop_completes_the_sequence_if_started(InMemoryBusEngine sut)
{
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var isCompleted = false;
@@ -68,15 +100,15 @@ public void Stop_completes_the_sequence_if_started(InMemoryBusEngine sut)
onCompleted: () => isCompleted = true
);
- sut.StopAsync().Wait();
+ await sut.StopAsync();
Assert.That(isCompleted, Is.True);
}
[Test, AutoMoqData]
- public void Stop_is_ignored_if_not_started(InMemoryBusEngine sut)
+ public async Task Stop_is_ignored_if_not_started(InMemoryBusEngine sut)
{
- sut.StopAsync().Wait();
+ await sut.StopAsync();
}
[Test, AutoMoqData]
diff --git a/tests/Tests.Nybus.Engine.InMemory/InMemoryConfiguratorExtensionsTests.cs b/tests/Tests.Nybus.Engine.InMemory/InMemoryConfiguratorExtensionsTests.cs
new file mode 100644
index 0000000..379dce5
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.InMemory/InMemoryConfiguratorExtensionsTests.cs
@@ -0,0 +1,27 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Microsoft.Extensions.DependencyInjection;
+using Moq;
+using NUnit.Framework;
+using Nybus;
+using Nybus.InMemory;
+
+// ReSharper disable InvokeAsExtensionMethod
+
+namespace Tests
+{
+ [TestFixture]
+ public class InMemoryConfiguratorExtensionsTests
+ {
+ [Test, AutoMoqData]
+ public void UseInMemoryBusEngine_registers_InMemory_bus_engine(TestNybusConfigurator nybus, IServiceCollection services)
+ {
+ InMemoryConfiguratorExtensions.UseInMemoryBusEngine(nybus);
+
+ nybus.ApplyServiceConfigurations(services);
+
+ Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IBusEngine) && sd.ImplementationType == typeof(InMemoryBusEngine))));
+ }
+ }
+}
diff --git a/tests/Tests.Nybus.Engine.InMemory/Tests.Nybus.Engine.InMemory.csproj b/tests/Tests.Nybus.Engine.InMemory/Tests.Nybus.Engine.InMemory.csproj
new file mode 100644
index 0000000..b97d2fa
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.InMemory/Tests.Nybus.Engine.InMemory.csproj
@@ -0,0 +1,21 @@
+
+
+
+ netcoreapp2.0
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/ObservableConsumerTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumerTests.cs
similarity index 99%
rename from tests/Tests.Nybus.Engine.RabbitMq/ObservableConsumerTests.cs
rename to tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumerTests.cs
index 8b2a1a0..56ff888 100644
--- a/tests/Tests.Nybus.Engine.RabbitMq/ObservableConsumerTests.cs
+++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/ObservableConsumerTests.cs
@@ -1,14 +1,14 @@
using System;
using System.Collections.Generic;
using System.Reactive;
-using NUnit.Framework;
using AutoFixture.NUnit3;
using Moq;
-using Nybus;
+using NUnit.Framework;
+using Nybus.RabbitMq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
-namespace Tests
+namespace Tests.RabbitMq
{
[TestFixture]
public class ObservableConsumerTests
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
similarity index 76%
rename from tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs
rename to tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
index 265ef45..ec45a5b 100644
--- a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs
+++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
@@ -3,31 +3,29 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using AutoFixture.Idioms;
using AutoFixture.NUnit3;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;
using Nybus;
using Nybus.Configuration;
+using Nybus.RabbitMq;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
-namespace Tests
+namespace Tests.RabbitMq
{
[TestFixture]
public class RabbitMqBusEngineTests
{
- [Test]
- public void Configuration_is_required()
- {
- Assert.Throws(() => new RabbitMqBusEngine(null, Mock.Of>()));
- }
+ private string DescriptorName(Type type) => $"{type.Namespace}:{type.Name}";
- [Test]
- public void Logger_is_required()
+ [Test, AutoMoqData]
+ public void Constructor_is_guarded(GuardClauseAssertion assertion)
{
- Assert.Throws(() => new RabbitMqBusEngine(Mock.Of(), null));
+ assertion.Verify(typeof(RabbitMqBusEngine).GetConstructors());
}
[Test, AutoMoqData]
@@ -87,9 +85,9 @@ public void SubscribeToEvent_ignores_multiple_registrations_of_same_event(Rabbit
}
[Test, AutoMoqData]
- public void Empty_sequence_is_returned_if_no_subscription(RabbitMqBusEngine sut)
+ public async Task Empty_sequence_is_returned_if_no_subscription(RabbitMqBusEngine sut)
{
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var incomingMessages = sequence.DumpInList();
@@ -97,11 +95,11 @@ public void Empty_sequence_is_returned_if_no_subscription(RabbitMqBusEngine sut)
}
[Test, AutoMoqData]
- public void Commands_can_be_subscribed(RabbitMqBusEngine sut)
+ public async Task Commands_can_be_subscribed(RabbitMqBusEngine sut)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var incomingMessages = sequence.DumpInList();
@@ -109,11 +107,11 @@ public void Commands_can_be_subscribed(RabbitMqBusEngine sut)
}
[Test, AutoMoqData]
- public void Events_can_be_subscribed(RabbitMqBusEngine sut)
+ public async Task Events_can_be_subscribed(RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var incomingMessages = sequence.DumpInList();
@@ -121,13 +119,13 @@ public void Events_can_be_subscribed(RabbitMqBusEngine sut)
}
[Test, AutoMoqData]
- public void Commands_and_events_can_be_subscribed(RabbitMqBusEngine sut)
+ public async Task Commands_and_events_can_be_subscribed(RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var incomingMessages = sequence.DumpInList();
@@ -135,74 +133,74 @@ public void Commands_and_events_can_be_subscribed(RabbitMqBusEngine sut)
}
[Test, AutoMoqData]
- public void QueueFactory_is_invoked_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task QueueFactory_is_invoked_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.EventQueueFactory).Verify(p => p.CreateQueue(It.IsAny()));
}
[Test, AutoMoqData]
- public void QueueFactory_is_invoked_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task QueueFactory_is_invoked_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.CommandQueueFactory).Verify(p => p.CreateQueue(It.IsAny()));
}
[Test, AutoMoqData]
- public void Exchange_is_declared_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task Exchange_is_declared_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
}
[Test, AutoMoqData]
- public void Exchange_is_declared_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task Exchange_is_declared_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
}
[Test, AutoMoqData]
- public void Queue_is_bound_to_exchange_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task Queue_is_bound_to_exchange_when_a_event_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.QueueBind(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
}
[Test, AutoMoqData]
- public void Queue_is_bound_to_exchange_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
+ public async Task Queue_is_bound_to_exchange_when_a_command_is_registered([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.QueueBind(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
}
[Test, AutoMoqData]
- public void Event_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEngine sut)
+ public async Task Event_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEngine sut)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
sequence.Subscribe(_ => { }); // subscribes to the sequence but takes no action when items are published
@@ -210,23 +208,24 @@ public void Event_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEng
}
[Test, AutoMoqData]
- public void Command_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEngine sut)
+ public async Task Command_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEngine sut)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
sequence.Subscribe(_ => { }); // subscribes to the sequence but takes no action when items are published
Assert.That(sut.Consumers.Count, Is.EqualTo(1));
}
+
[Test, AutoMoqData]
- public void Events_can_be_received([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent @event)
+ public async Task Events_with_invalid_type_format_are_ignored_and_nacked([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent @event)
{
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var encoding = Encoding.UTF8;
@@ -248,6 +247,38 @@ public void Events_can_be_received([Frozen] IRabbitMqConfiguration configuration
sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
+ Assert.That(incomingMessages, Is.Empty);
+
+ Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny(), It.IsAny()));
+ }
+
+ [Test, AutoMoqData]
+ public async Task Events_can_be_received([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent @event)
+ {
+ sut.SubscribeToEvent();
+
+ var sequence = await sut.StartAsync();
+
+ var encoding = Encoding.UTF8;
+
+ IBasicProperties properties = new BasicProperties
+ {
+ MessageId = messageId,
+ ContentEncoding = encoding.WebName,
+ Headers = new Dictionary
+ {
+ ["Nybus:MessageId"] = encoding.GetBytes(messageId),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(@event.GetType())),
+ ["Nybus:CorrelationId"] = correlationId.ToByteArray()
+ }
+ };
+
+ var body = configuration.Serializer.SerializeObject(@event, encoding);
+
+ var incomingMessages = sequence.DumpInList();
+
+ sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
+
Assert.That(incomingMessages, Has.Exactly(1).InstanceOf>());
var message = incomingMessages[0] as EventMessage;
@@ -260,11 +291,11 @@ public void Events_can_be_received([Frozen] IRabbitMqConfiguration configuration
}
[Test, AutoMoqData]
- public void Commands_can_be_received([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
+ public async Task Commands_with_invalid_type_format_are_ignored_and_nacked([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
{
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var encoding = Encoding.UTF8;
@@ -286,6 +317,39 @@ public void Commands_can_be_received([Frozen] IRabbitMqConfiguration configurati
sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
+ Assert.That(incomingMessages, Is.Empty);
+
+ Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny(), It.IsAny()));
+ }
+
+
+ [Test, AutoMoqData]
+ public async Task Commands_can_be_received([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
+ {
+ sut.SubscribeToCommand();
+
+ var sequence = await sut.StartAsync();
+
+ var encoding = Encoding.UTF8;
+
+ IBasicProperties properties = new BasicProperties
+ {
+ MessageId = messageId,
+ ContentEncoding = encoding.WebName,
+ Headers = new Dictionary
+ {
+ ["Nybus:MessageId"] = encoding.GetBytes(messageId),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
+ ["Nybus:CorrelationId"] = correlationId.ToByteArray()
+ }
+ };
+
+ var body = configuration.Serializer.SerializeObject(command, encoding);
+
+ var incomingMessages = sequence.DumpInList();
+
+ sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
+
Assert.That(incomingMessages, Has.Exactly(1).InstanceOf>());
var message = incomingMessages[0] as CommandMessage;
@@ -298,12 +362,12 @@ public void Commands_can_be_received([Frozen] IRabbitMqConfiguration configurati
}
[Test, AutoMoqData]
- public void Invalid_events_are_discarded([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent @event)
+ public async Task Invalid_events_are_discarded([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestEvent @event)
{
// At least one subscription is needed to inject invalid messages
sut.SubscribeToEvent();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var encoding = Encoding.UTF8;
@@ -314,7 +378,7 @@ public void Invalid_events_are_discarded([Frozen] IRabbitMqConfiguration configu
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(@event.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(@event.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -331,12 +395,12 @@ public void Invalid_events_are_discarded([Frozen] IRabbitMqConfiguration configu
}
[Test, AutoMoqData]
- public void Invalid_commands_are_discarded([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
+ public async Task Invalid_commands_are_discarded([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, FirstTestCommand command)
{
// At least one subscription is needed to inject invalid messages
sut.SubscribeToCommand();
- var sequence = sut.StartAsync().Result;
+ var sequence = await sut.StartAsync();
var encoding = Encoding.UTF8;
@@ -347,7 +411,7 @@ public void Invalid_commands_are_discarded([Frozen] IRabbitMqConfiguration confi
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(command.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -435,7 +499,7 @@ public async Task NotifySuccess_acks_command_messages([Frozen] IRabbitMqConfigur
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(command.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -467,7 +531,7 @@ public async Task NotifySuccess_acks_event_messages([Frozen] IRabbitMqConfigurat
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(@event.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(@event.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -499,7 +563,7 @@ public async Task NotifySuccess_can_handle_closed_connections([Frozen] IRabbitMq
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(command.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -533,7 +597,7 @@ public async Task NotifyFail_nacks_command_messages([Frozen] IRabbitMqConfigurat
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(command.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
@@ -565,7 +629,7 @@ public async Task NotifyFail_can_handle_closed_connections([Frozen] IRabbitMqCon
Headers = new Dictionary
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
- ["Nybus:MessageType"] = encoding.GetBytes(command.GetType().FullName),
+ ["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(command.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray()
}
};
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensionsTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensionsTests.cs
index f691c0c..bc020ad 100644
--- a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensionsTests.cs
+++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqConfiguratorExtensionsTests.cs
@@ -6,6 +6,7 @@
using NUnit.Framework;
using Nybus;
using Nybus.Configuration;
+using Nybus.RabbitMq;
using RabbitMQ.Client;
using Tests.Configuration;
// ReSharper disable InvokeAsExtensionMethod
diff --git a/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs b/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs
index fafa03e..666a97e 100644
--- a/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs
+++ b/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs
@@ -15,16 +15,6 @@ namespace Tests
[TestFixture]
public class NybusConfiguratorExtensionsTests
{
- [Test, AutoMoqData]
- public void UseInMemoryBusEngine_registers_InMemory_bus_engine(TestNybusConfigurator nybus, IServiceCollection services)
- {
- NybusConfiguratorExtensions.UseInMemoryBusEngine(nybus);
-
- nybus.ApplyServiceConfigurations(services);
-
- Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IBusEngine) && sd.ImplementationType == typeof(InMemoryBusEngine))));
- }
-
[Test, AutoMoqData]
public void SubscribeToCommand_registers_handler_for_command(TestNybusConfigurator nybus, ISubscriptionBuilder subscriptionBuilder)
{
diff --git a/tests/Tests.Nybus/Utils/MessageDescriptorStoreTests.cs b/tests/Tests.Nybus/Utils/MessageDescriptorStoreTests.cs
new file mode 100644
index 0000000..3945c4a
--- /dev/null
+++ b/tests/Tests.Nybus/Utils/MessageDescriptorStoreTests.cs
@@ -0,0 +1,67 @@
+using System;
+using NUnit.Framework;
+using Nybus.Utils;
+using Samples;
+
+namespace Tests.Utils
+{
+ [TestFixture]
+ public class MessageDescriptorStoreTests
+ {
+ [Test]
+ [InlineAutoMoqData(typeof(FirstTestCommand))]
+ [InlineAutoMoqData(typeof(SecondTestCommand))]
+ [InlineAutoMoqData(typeof(ThirdTestCommand))]
+ [InlineAutoMoqData(typeof(AttributeTestCommand))]
+ [InlineAutoMoqData(typeof(FirstTestEvent))]
+ [InlineAutoMoqData(typeof(SecondTestEvent))]
+ [InlineAutoMoqData(typeof(ThirdTestEvent))]
+ [InlineAutoMoqData(typeof(AttributeTestEvent))]
+ [AutoMoqData]
+ public void Same_type_wont_be_registered_twice(Type type, MessageDescriptorStore sut)
+ {
+ Assume.That(sut.RegisterType(type), Is.True);
+
+ Assert.That(sut.RegisterType(type), Is.False);
+ }
+
+ [Test]
+ [InlineAutoMoqData(typeof(FirstTestCommand))]
+ [InlineAutoMoqData(typeof(SecondTestCommand))]
+ [InlineAutoMoqData(typeof(ThirdTestCommand))]
+ [InlineAutoMoqData(typeof(FirstTestEvent))]
+ [InlineAutoMoqData(typeof(SecondTestEvent))]
+ [InlineAutoMoqData(typeof(ThirdTestEvent))]
+ [AutoMoqData]
+ public void Type_can_be_found_by_its_descriptor(Type type, MessageDescriptorStore sut)
+ {
+ var descriptor = MessageDescriptor.CreateFromType(type);
+
+ sut.RegisterType(type);
+
+ var isFound = sut.TryGetTypeForDescriptor(descriptor, out var typeFound);
+
+ Assert.That(isFound, Is.True);
+ Assert.That(typeFound, Is.EqualTo(type).Using((first, second) => string.Equals(first.FullName, second.FullName, StringComparison.OrdinalIgnoreCase)));
+ }
+
+ [Test]
+ [InlineAutoMoqData(typeof(FirstTestCommand))]
+ [InlineAutoMoqData(typeof(SecondTestCommand))]
+ [InlineAutoMoqData(typeof(ThirdTestCommand))]
+ [InlineAutoMoqData(typeof(FirstTestEvent))]
+ [InlineAutoMoqData(typeof(SecondTestEvent))]
+ [InlineAutoMoqData(typeof(ThirdTestEvent))]
+ [AutoMoqData]
+ public void Descriptor_can_be_found_by_its_type(Type type, MessageDescriptorStore sut)
+ {
+ sut.RegisterType(type);
+
+ var isFound = sut.TryGetDescriptorForType(type, out var descriptorFound);
+
+ Assert.That(isFound, Is.True);
+ Assert.That(descriptorFound.Name, Is.EqualTo(type.Name));
+ Assert.That(descriptorFound.Namespace, Is.EqualTo(type.Namespace));
+ }
+ }
+}