diff --git a/Frank.PulseFlow.Logging/PulseFlowLogger.cs b/Frank.PulseFlow.Logging/PulseFlowLogger.cs index 22b67e6..79f8f22 100644 --- a/Frank.PulseFlow.Logging/PulseFlowLogger.cs +++ b/Frank.PulseFlow.Logging/PulseFlowLogger.cs @@ -35,7 +35,7 @@ public PulseFlowLogger(string categoryName, IConduit conduit, IOptionsMonitorThe exception associated with the log entry. /// A function that formats the log message. public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) - => _conduit.SendAsync(new LogPulse(logLevel, eventId, exception, _categoryName, formatter.Invoke(state, exception), state as IReadOnlyList>)).GetAwaiter().GetResult(); + => _conduit.SendAsync(new LogPulse(logLevel, eventId, exception, _categoryName, formatter.Invoke(state, exception), (IReadOnlyList>)state!), CancellationToken.None).GetAwaiter().GetResult(); /// /// Checks if logging is enabled for the specified log level. diff --git a/Frank.PulseFlow.Logging/README.md b/Frank.PulseFlow.Logging/README.md new file mode 100644 index 0000000..3091976 --- /dev/null +++ b/Frank.PulseFlow.Logging/README.md @@ -0,0 +1,64 @@ +# Frank.PulseFlow.Logging + +This library provides a simple logger for use in .NET applications. It uses the `Microsoft.Extensions.Logging` library +backed by `Frank.PulseFlow` for logging. It will log to the console by default, but can add one or more `IFlow`'s to do +whatever you want with the log messages. A common use case is to log to a file or a database, and because `Frank.PulseFlow` +is thread-safe, you can do so without worrying about concurrency issues like file locks, or the overhead of waiting for a lock. + +## Usage + +```csharp +using Frank.PulseFlow.Logging; + +public class Program +{ + public static async Task Main(string[] args) + { + var builder = new HostBuilder() + .ConfigureLogging((hostContext, logging) => + { + logging.AddPulseFlow(); + }) + .ConfigureServices((hostContext, services) => + { + services.AddPulseFlow(); + }); + .Build(); + + await builder.RunAsync(); + } +} + +public class FileLoggerFlow(IOptions options) : IFlow +{ + private readonly FileLoggerSettings _settings = options.Value; + + public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) + { + var thing = pulse as LogPulse; + await File.AppendAllTextAsync(_settings.LogPath, thing! + Environment.NewLine, cancellationToken); + } + + public bool CanHandle(Type pulseType) => pulseType == typeof(LogPulse); +} + +public class FileLoggerSettings +{ + public string LogPath { get; set; } = "../../../../logs.log"; +} +``` + +## Configuration + +The `AddPulseFlow` method has a few overloads that allow you to configure the logger. The default configuration is to log +to the console, but you can add one or more `IFlow`'s to the logger to do whatever you want with the log messages. A common +use case is to log to a file or a database, and because `Frank.PulseFlow` is thread-safe, you can do so without worrying +about concurrency issues like file locks, or the overhead of waiting for a lock. + +## Contributing + +Contributions are welcome! Please see create an issue before submitting a pull request to discuss the changes you would like to make. + +## License + +This library is licensed under the MIT license. See the [LICENSE](../LICENSE) file for more information. diff --git a/Frank.PulseFlow.Tests/Frank.PulseFlow.Tests.csproj b/Frank.PulseFlow.Tests/Frank.PulseFlow.Tests.csproj index 28fb93f..06fc01e 100644 --- a/Frank.PulseFlow.Tests/Frank.PulseFlow.Tests.csproj +++ b/Frank.PulseFlow.Tests/Frank.PulseFlow.Tests.csproj @@ -7,7 +7,9 @@ - + + + diff --git a/Frank.PulseFlow.Tests/PulseFlowTests.cs b/Frank.PulseFlow.Tests/PulseFlowTests.cs index dca98cd..3e6ecc1 100644 --- a/Frank.PulseFlow.Tests/PulseFlowTests.cs +++ b/Frank.PulseFlow.Tests/PulseFlowTests.cs @@ -1,93 +1,197 @@ +using System.Diagnostics; + +using FluentAssertions; + using Frank.PulseFlow.Logging; +using Frank.Reflection; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Xunit.Abstractions; -using Frank.Reflection; +using Frank.Testing.TestBases; + +using Microsoft.Extensions.Options; namespace Frank.PulseFlow.Tests; -public class PulseFlowTests +public class PulseFlowTests(ITestOutputHelper outputHelper) : HostApplicationTestBase(outputHelper) { - private readonly ITestOutputHelper _outputHelper; + private readonly ITestOutputHelper _outputHelper = outputHelper; + private readonly TestPulseContainer _container = new(); - public PulseFlowTests(ITestOutputHelper outputHelper) + /// + protected override Task SetupAsync(HostApplicationBuilder builder) { - _outputHelper = outputHelper; + builder.Logging.AddPulseFlow(); + + builder.Services.AddPulseFlow(); + builder.Services.AddPulseFlow(x => x.AddFlow().AddFlow()); + builder.Services.AddPulseFlow(); + builder.Services.AddPulseFlow(); + builder.Services.AddPulseFlow(); + builder.Services.AddHostedService(); + builder.Services.AddSingleton(_container); + + builder.Services.Configure(x => x.LogPath = "logs.log"); + _outputHelper.WriteTable(builder.Services.Select(x => new { Service = x.ServiceType.GetFriendlyName(), Implementation = x.ImplementationType?.GetFriendlyName(), x.Lifetime }).OrderBy(x => x.Service)); + return Task.CompletedTask; } [Fact] - public void Test1() + public async Task Test1() { - var host = CreateHostBuilder().Build(); + await Task.Delay(500); + var overview = new [] + { + new + { + Name = "Blue", Count = _container.BlueMessages.Count, + }, + new + { + Name = "Red", Count = _container.RedMessages.Count, + }, + new + { + Name = "Log", Count = _container.LogMessages.Count, + }, + new + { + Name = "Timer", Count = _container.TimerPulses.Count, + }, + new + { + Name = "Timer2", Count = _container.TimerPulses2.Count, + }, + }; + + _outputHelper.WriteTable(overview); - host.Start(); + await Task.Delay(500); + + _container.BlueMessages.Should().NotBeEmpty(); + _container.RedMessages.Should().NotBeEmpty(); + _container.LogMessages.Should().NotBeEmpty(); + _container.TimerPulses.Should().NotBeEmpty(); + _container.TimerPulses2.Should().NotBeEmpty(); } - private class MyService : BackgroundService + private class TestOutputHelperFlow(TestPulseContainer container) : IFlow { - private readonly ILogger _logger; + public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) + { + var thing = pulse as LogPulse; + container.LogMessages.Add(thing!); + await Task.CompletedTask; + } - public MyService(ILogger logger) => _logger = logger; + public bool CanHandle(Type pulseType) + { + return pulseType == typeof(LogPulse); + } + } + + private class BlueOutputFlow(TestPulseContainer container) : IFlow + { + public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) + { + var thing = pulse as MyMessage; + container.BlueMessages.Add(thing!); + await Task.CompletedTask; + } + + public bool CanHandle(Type pulseType) + { + return pulseType == typeof(MyMessage); + } + } + + private class RedOutputFlow(TestPulseContainer container) : IFlow + { + public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) + { + var thing = pulse as MyMessage; + container.RedMessages.Add(thing!); + await Task.CompletedTask; + } + public bool CanHandle(Type pulseType) + { + return pulseType == typeof(MyMessage); + } + } + + private class MyService(IConduit conduit) : BackgroundService + { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _logger.LogInformation("Hello from {ServiceName}", nameof(MyService)); - try - { - throw new Exception("This is an exception"); - } - catch (Exception e) + var stopWatch = Stopwatch.StartNew(); + while (!stoppingToken.IsCancellationRequested && stopWatch.Elapsed < TimeSpan.FromSeconds(1)) { - _logger.LogError(e, "This is an exception in {ServiceName}", nameof(MyService)); + await conduit.SendAsync(new MyMessage("Hello, World! " + stopWatch.Elapsed.ToString("c")), stoppingToken); + await conduit.SendAsync(new TimerPulse() {Elapsed = stopWatch.Elapsed}, stoppingToken); } - await Task.Delay(1000, stoppingToken); } + } - private IHostBuilder CreateHostBuilder() + private class TimerHandler(TestPulseContainer container) : IPulseHandler { - return Host.CreateDefaultBuilder() - .ConfigureLogging(logging => - { - logging.ClearProviders(); - logging.AddPulseFlow(); - }) - .ConfigureServices((context, services) => - { - services.AddSingleton(_outputHelper); - services.AddPulseFlow(builder => - { - builder.AddFlow(); - }); - - services.AddHostedService(); - }); + public async Task HandleAsync(TimerPulse pulse, CancellationToken cancellationToken) + { + container.TimerPulses.Add(pulse); + await Task.CompletedTask; + } } - - private class TestOutputFlow : IFlow + + private class TimerHandler2(TestPulseContainer container) : IPulseHandler { - private readonly ITestOutputHelper _outputHelper; - - public TestOutputFlow(ITestOutputHelper outputHelper) + public async Task HandleAsync(TimerPulse pulse, CancellationToken cancellationToken) { - _outputHelper = outputHelper; + container.TimerPulses2.Add(pulse); + await Task.CompletedTask; } - + } + + private class MyMessage(string message) : BasePulse + { + public string Message { get; set; } = message; + + public override string ToString() => $"MyMessage: {Message}"; + } + + private class TimerPulse : BasePulse + { + public TimeSpan Elapsed { get; set; } + } + + private class TestPulseContainer + { + public List BlueMessages { get; } = new(); + public List RedMessages { get; } = new(); + public List LogMessages { get; } = new(); + public List TimerPulses { get; } = new(); + public List TimerPulses2 { get; } = new(); + + } + + public class FileLoggerFlow(IOptions options) : IFlow + { + private readonly FileLoggerSettings _settings = options.Value; + public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) { var thing = pulse as LogPulse; - var message = thing!.ToString(); - _outputHelper.WriteLine(message); + await File.AppendAllTextAsync(_settings.LogPath!, thing! + Environment.NewLine, cancellationToken); await Task.CompletedTask; } - public bool CanHandle(Type pulseType) - { - _outputHelper.WriteLine($"CanHandle: {pulseType.GetFriendlyName()}"); - return pulseType.BaseType == typeof(LogPulse); - } + public bool CanHandle(Type pulseType) => pulseType == typeof(LogPulse); + } + + public class FileLoggerSettings + { + public string? LogPath { get; set; } } -} \ No newline at end of file +} diff --git a/Frank.PulseFlow/Frank.PulseFlow.csproj b/Frank.PulseFlow/Frank.PulseFlow.csproj index 7de67d9..4eb826d 100644 --- a/Frank.PulseFlow/Frank.PulseFlow.csproj +++ b/Frank.PulseFlow/Frank.PulseFlow.csproj @@ -8,7 +8,8 @@ - + + diff --git a/Frank.PulseFlow/GlobalUsings.cs b/Frank.PulseFlow/GlobalUsings.cs new file mode 100644 index 0000000..fe80242 --- /dev/null +++ b/Frank.PulseFlow/GlobalUsings.cs @@ -0,0 +1,9 @@ +// Global using directives + +global using System.Threading.Channels; + +global using Frank.Channels.DependencyInjection; +global using Frank.PulseFlow.Internal; + +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Hosting; \ No newline at end of file diff --git a/Frank.PulseFlow/IConduit.cs b/Frank.PulseFlow/IConduit.cs index 83d3efd..81126a5 100644 --- a/Frank.PulseFlow/IConduit.cs +++ b/Frank.PulseFlow/IConduit.cs @@ -9,6 +9,6 @@ public interface IConduit /// Sends a pulse to the underlying infrastructure. /// /// - /// - Task SendAsync(IPulse message); + /// + Task SendAsync(IPulse message, CancellationToken cancellationToken); } \ No newline at end of file diff --git a/Frank.PulseFlow/IPulseHandler.cs b/Frank.PulseFlow/IPulseHandler.cs new file mode 100644 index 0000000..362783a --- /dev/null +++ b/Frank.PulseFlow/IPulseHandler.cs @@ -0,0 +1,15 @@ +namespace Frank.PulseFlow; + +/// IPulseHandler Interface +/// Represents a handler for processing pulses. +/// /// The type of pulse to handle. +public interface IPulseHandler where T : IPulse +{ + /// + /// Handles the pulse asynchronously. + /// + /// The pulse to be handled. + /// The cancellation token that can be used to cancel the operation. + /// A task representing the asynchronous operation. + Task HandleAsync(T pulse, CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/Frank.PulseFlow/IncompatibleFlowException.cs b/Frank.PulseFlow/IncompatibleFlowException.cs new file mode 100644 index 0000000..620a760 --- /dev/null +++ b/Frank.PulseFlow/IncompatibleFlowException.cs @@ -0,0 +1,3 @@ +namespace Frank.PulseFlow; + +public class IncompatibleFlowException(string s) : Exception(s); \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/Channel.cs b/Frank.PulseFlow/Internal/Channel.cs deleted file mode 100644 index 4a8a904..0000000 --- a/Frank.PulseFlow/Internal/Channel.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Threading.Channels; - -namespace Frank.PulseFlow.Internal; - -/// -/// A wrapper around a that sends and receives objects. -/// -internal class Channel : IChannel -{ - private readonly Channel _channel = System.Threading.Channels.Channel.CreateUnbounded(); - - /// - /// Sends the provided IPulse message asynchronously. - /// - /// The IPulse message to send. - /// A Task representing the asynchronous operation. - public async Task SendAsync(IPulse message) => await _channel.Writer.WriteAsync(message); - - /// - /// Reads all available pulses asynchronously from the channel. - /// - /// The cancellation token to cancel the operation. - /// An asynchronous enumerable of IPulse objects. - public IAsyncEnumerable ReadAllAsync(CancellationToken cancellationToken) => _channel.Reader.ReadAllAsync(cancellationToken); -} \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/Conduit.cs b/Frank.PulseFlow/Internal/Conduit.cs index 476dfa3..9b70ce9 100644 --- a/Frank.PulseFlow/Internal/Conduit.cs +++ b/Frank.PulseFlow/Internal/Conduit.cs @@ -1,22 +1,6 @@ namespace Frank.PulseFlow.Internal; -/// -/// A wrapper around a that sends and receives objects. -/// -internal class Conduit : IConduit +internal class Conduit(ChannelWriter writer) : IConduit { - private readonly IChannel _messageChannel; - - /// - /// Represents a conduit for message communication between channels. - /// - /// The channel used for message communication. - public Conduit(IChannel messageChannel) => _messageChannel = messageChannel; - - /// - /// Sends a pulse message asynchronously. - /// - /// The pulse message to be sent. - /// A task representing the asynchronous operation. - public Task SendAsync(IPulse message) => _messageChannel.SendAsync(message); + public async Task SendAsync(IPulse message, CancellationToken cancellationToken) => await writer.WriteAsync(message, cancellationToken); } \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/FlowBuilder.cs b/Frank.PulseFlow/Internal/FlowBuilder.cs index 7d161bb..22a7f61 100644 --- a/Frank.PulseFlow/Internal/FlowBuilder.cs +++ b/Frank.PulseFlow/Internal/FlowBuilder.cs @@ -1,28 +1,10 @@ -using Microsoft.Extensions.DependencyInjection; - namespace Frank.PulseFlow.Internal; -/// -/// Represents a builder for configuring the pulse flow. -/// -internal class FlowBuilder : IFlowBuilder +internal class FlowBuilder(IServiceCollection services) : IFlowBuilder { - private readonly IServiceCollection _services; - - /// - /// Creates a new instance of the FlowBuilder class with the specified services. - /// - /// The services collection used for dependency injection. - public FlowBuilder(IServiceCollection services) => _services = services; - - /// - /// Adds a flow of type T to the pulse flow builder. - /// - /// The type of the flow to be added. - /// The pulse flow builder instance. public IFlowBuilder AddFlow() where T : class, IFlow { - _services.AddTransient(); + services.AddPulseFlow(); return this; } } \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/GenericFlow.cs b/Frank.PulseFlow/Internal/GenericFlow.cs new file mode 100644 index 0000000..1cc4da5 --- /dev/null +++ b/Frank.PulseFlow/Internal/GenericFlow.cs @@ -0,0 +1,15 @@ +namespace Frank.PulseFlow.Internal; + +internal class GenericFlow(THandler handler) : IFlow + where TPulse : IPulse + where THandler : IPulseHandler +{ + public Task HandleAsync(IPulse pulse, CancellationToken cancellationToken) + { + if (pulse is TPulse t) + return handler.HandleAsync(t, cancellationToken); + throw new IncompatibleFlowException($"The pulse is not of type {typeof(TPulse).Name}. This is impossible and should never happen, so please report this as a bug on GitHub ASAP. Thank you!"); + } + + public bool CanHandle(Type pulseType) => pulseType == typeof(TPulse); +} \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/IChannel.cs b/Frank.PulseFlow/Internal/IChannel.cs deleted file mode 100644 index 68e78a8..0000000 --- a/Frank.PulseFlow/Internal/IChannel.cs +++ /dev/null @@ -1,21 +0,0 @@ -namespace Frank.PulseFlow.Internal; - -/// -/// Represents a channel for sending and receiving pulses. -/// -internal interface IChannel -{ - /// - /// Sends an asynchronous pulse message. - /// - /// The pulse message to send. - /// A task representing the asynchronous operation. - Task SendAsync(IPulse message); - - /// - /// Reads all available pulses asynchronously. - /// - /// A cancellation token used to cancel the operation. - /// An asynchronous enumerable of IPulse objects. - IAsyncEnumerable ReadAllAsync(CancellationToken cancellationToken); -} \ No newline at end of file diff --git a/Frank.PulseFlow/Internal/PulseNexus.cs b/Frank.PulseFlow/Internal/PulseNexus.cs index 01bd93e..0a11db6 100644 --- a/Frank.PulseFlow/Internal/PulseNexus.cs +++ b/Frank.PulseFlow/Internal/PulseNexus.cs @@ -1,33 +1,11 @@ -using Microsoft.Extensions.Hosting; +namespace Frank.PulseFlow.Internal; -namespace Frank.PulseFlow.Internal; - -/// -/// Internal class representing the PulseNexus. -/// -internal class PulseNexus : BackgroundService +internal class PulseNexus(ChannelReader reader, IEnumerable pulseFlows) : BackgroundService { - private readonly IChannel _channel; - private readonly IEnumerable _pulseFlows; - - /// - /// Represents a class that provides a nexus for pulse flows. - /// - public PulseNexus(IChannel channel, IEnumerable pulseFlows) - { - _channel = channel; - _pulseFlows = pulseFlows; - } - - /// - /// Executes asynchronous tasks for handling pulses. - /// - /// The cancellation token that can be used to stop the execution. - /// A task representing the asynchronous execution. protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (IPulse pulse in _channel.ReadAllAsync(stoppingToken)) - await Task.WhenAll(_pulseFlows + await foreach (IPulse pulse in reader.ReadAllAsync(stoppingToken)) + await Task.WhenAll(pulseFlows .Where(x => x.CanHandle(pulse.GetType())) .Select(flow => flow.HandleAsync(pulse, stoppingToken))); } diff --git a/Frank.PulseFlow/ServiceCollectionExtensions.cs b/Frank.PulseFlow/ServiceCollectionExtensions.cs index ed1d7fd..56160fe 100644 --- a/Frank.PulseFlow/ServiceCollectionExtensions.cs +++ b/Frank.PulseFlow/ServiceCollectionExtensions.cs @@ -1,7 +1,3 @@ -using Frank.PulseFlow.Internal; - -using Microsoft.Extensions.DependencyInjection; - namespace Frank.PulseFlow; /// @@ -12,15 +8,6 @@ public static class ServiceCollectionExtensions /// /// Adds the PulseFlow to the service collection. /// - /// - /// - /// services.AddPulseFlow(builder => - /// { - /// builder.AddFlow[Flow](); - /// builder.AddFlow[AnotherFlow](); - /// }); - /// - /// /// The service collection to which the PulseFlow components will be added. /// An action delegate to configure the PulseFlow builder. /// The updated service collection. @@ -28,10 +15,52 @@ public static IServiceCollection AddPulseFlow(this IServiceCollection services, { FlowBuilder builder = new(services); configure(builder); + return services; + } + + /// + /// Adds the specified pulse handler to the service collection for the PulseFlow. + /// + /// The type of pulse that the handler can handle. + /// The type of the pulse handler. + /// The service collection to which the pulse handler will be added. + /// The updated service collection. + /// + /// This method adds a singleton instance of the specified pulse handler type to the service collection. + /// The pulse handler should implement the interface, where T is the type of the pulse. + /// + public static IServiceCollection AddPulseFlow(this IServiceCollection services) where THandler : class, IPulseHandler where TPulse : IPulse + { + if (!services.Any(service => service.ServiceType == typeof(IPulseHandler) && service.ImplementationType == typeof(THandler))) + { + services.AddSingleton, THandler>(); + services.AddSingleton(); + } - services.AddHostedService(); - services.AddSingleton(); - services.AddSingleton(); + if (!services.Any(service => service.ServiceType == typeof(IFlow) && service.ImplementationType == typeof(GenericFlow))) + services.AddSingleton>(); + + return services.AddPulseFlow>(); + } + + /// + /// Adds PulseFlow services to the with the specified configuration. + /// + /// The to add the services to. + /// The after the services have been added. + public static IServiceCollection AddPulseFlow(this IServiceCollection services) where TFlow : class, IFlow + { + if (!services.Any(service => service.ServiceType == typeof(IFlow) && service.ImplementationType == typeof(TFlow))) + services.AddSingleton(); + + if (!services.Any(service => service.ServiceType == typeof(BackgroundService) && service.ImplementationType == typeof(PulseNexus))) + services.AddHostedService(); + + if (services.All(service => service.ServiceType != typeof(IConduit))) + services.AddSingleton(); + + if (services.All(service => service.ServiceType != typeof(Channel))) + services.AddChannel(); return services; } diff --git a/README.md b/README.md index 1f0510c..91218d1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# PulseFlow (Local Messaging) +# PulseFlow (Local Messaging With Channels) PulseFlow Local Messaging is a lightweight, high-performance messaging system that enables seamless communication, and thread-safe data transfer between different parts of an application. It's designed to be simple, flexible, and scalable, allowing for easy integration into any system architecture. @@ -46,6 +46,15 @@ PulseFlow Local Messaging is a lightweight, high-performance messaging system th between different parts of an application. It's designed to be simple, flexible, and scalable, allowing for easy integration into any system architecture. +This library does have a dependency on `Frank.Channels.DependencyInjection`, which is a simple registration of `System. +Threading.Channels.Channel` in the Dependency Injection container. This is done to make it easier to use `System. +Threading.Channels` in a Dependency Injection scenario, and to make it easier to use `System.Threading.Channels` in a +thread-safe manner. This includes a `ChannelWriter` and a `ChannelReader` that are a thread-safe singlteton, and can +be called directly from the Dependency Injection container without having to call `Channel`, and then `Channel.Writer` or +`Channel.Reader` directly. This is done to make it easier to use `System.Threading.Channels` in a Dependency Injection +behind the scenes in PulseFlow. This will also make it possible to "intercept" the `ChannelWriter` and `ChannelReader` +for debugging, logging, or other purposes. + ### Key Features - **Lightweight**: PulseFlow is a lightweight messaging system, with a small footprint and minimal resource @@ -74,10 +83,10 @@ graph TB ApiPulse[API : IPulse] -->|transmitted via| Conduit[IConduit] end subgraph "Delivery" - Conduit -->|delivered to| IChannel[IChannel] + Conduit -->|delivered to| Channel[Channel>] end subgraph "Consumption and Routing" - IChannel -->|consumed and routed by| Nexus[Nexus] + Channel -->|consumed and routed by| Nexus[Nexus] Nexus -->|typeof==Email| EmailFlow[EmailFlow : IFlow] Nexus -->|typeof==API| Flow[FtpAndApiFlow : IFlow] Nexus -->|typeof==FTP| Flow[FtpAndApiFlow : IFlow] @@ -89,8 +98,7 @@ graph TB In this Mermaid diagram: - **IPulse** is the interface for the Pulse. - **IConduit** is the interface for the Conduit, which is the pathway through which messages are transmitted. -- **IChannel** is the interface for the Channel, which is a wrapper around the `System.Threading.Channels.Channel` class, which is a thread-safe data structure for passing data between producers and consumers located in different threads. -- **Nexus** is the central processing service, which handles the pulse messages. +- **Nexus** is the central processing service, which handles the pulse messages and routes them to their respective destinations. - **IFlow** is the interface for the a flow, which is the mechanism that handles/consumes the pulse messages. - **ILogger** is the interface for the generic logger in dotnet. @@ -117,7 +125,7 @@ graph TB ApiPulse6[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit] end subgraph Delivery - Conduit -->|delivered to| IChannel[IChannel] + Conduit -->|delivered to| IChannel[Channel] end subgraph Consumption and Routing IChannel -->|consumed and routed by| Nexus[Nexus] @@ -136,7 +144,7 @@ graph LR PrioritizedWork[StandardMessage : IPulse] -->|standard| Conduit[IConduit] PrioritizedWork[StandardMessage : IPulse] --> Conduit[IConduit] PrioritizedWork[PremiumMessage : IPulse] -->|premium| Conduit[IConduit] - Conduit -->|delivered to| IChannel[IChannel] + Conduit -->|delivered to| IChannel[Channel] IChannel --> Nexus[Nexus] subgraph parallel processing @@ -184,7 +192,7 @@ graph TB PulseB -.->|Send| IConduit[IConduit] PulseC -.->|Send| IConduit[IConduit] end - IConduit --> IChannel[IChannel] + IConduit --> IChannel[Channel] IChannel --> Nexus[Nexus] Nexus --> FlowX[IFlow] Nexus --> FlowY[IFlow]