From 62b2207c897aef6d01d5d2ba22d59b62b13e0306 Mon Sep 17 00:00:00 2001 From: "yury.petrov" Date: Fri, 20 Jul 2018 14:40:12 +1000 Subject: [PATCH] adding AbstractDispatchingMessageHandler to process messages in parallel as long as a message-based key is not already being processed --- .../AbstractDispatchingMessageHandler.cs | 95 +++++++++++++++++++ .../Model/ModelDetails.cs | 10 ++ .../SimpleRabbit.NetCore.Dispatcher.csproj | 11 +++ SimpleRabbit.NetCore.sln | 8 +- 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 SimpleRabbit.NetCore.Dispatcher/AbstractDispatchingMessageHandler.cs create mode 100644 SimpleRabbit.NetCore.Dispatcher/Model/ModelDetails.cs create mode 100644 SimpleRabbit.NetCore.Dispatcher/SimpleRabbit.NetCore.Dispatcher.csproj diff --git a/SimpleRabbit.NetCore.Dispatcher/AbstractDispatchingMessageHandler.cs b/SimpleRabbit.NetCore.Dispatcher/AbstractDispatchingMessageHandler.cs new file mode 100644 index 0000000..0c2f0a5 --- /dev/null +++ b/SimpleRabbit.NetCore.Dispatcher/AbstractDispatchingMessageHandler.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace SimpleRabbit.NetCore.Dispatcher +{ + public abstract class AbstractDispatchingMessageHandler : IMessageHandler, IChannelOptions + { + private readonly ILogger> _logger; + private readonly Dictionary>> _queues = new Dictionary>>(); + private readonly object _semaphore = new object(); + + private IModel _model; + private Action _onError; + + protected AbstractDispatchingMessageHandler(ILogger> logger) + { + _logger = logger; + } + + public abstract bool CanProcess(string tag); + protected abstract void ProcessMessage(T msg); + public abstract T Get(BasicDeliverEventArgs args); + public abstract string GetKey(T msg); + + public bool Process(BasicDeliverEventArgs args) + { + var msg = Get(args); + var key = GetKey(msg); + lock (_semaphore) + { + if (!_queues.TryGetValue(key, out var queue)) + { + queue = new List>(); + _queues.Add(key, queue); + + Task.Run(() => ProcessQueue(queue, key)); + } + queue.Add(new ModelDetails + { + Args = args, + Message = msg + }); + } + + return false; + } + + private void ProcessQueue(List> queue, string key) + { + try + { + while (true) + { + ModelDetails details; + lock (_semaphore) + { + if (queue.Count == 0) + { + _queues.Remove(key); + return; + } + + details = queue[0]; + } + + ProcessMessage(details.Message); + + lock (_semaphore) + { + queue.Remove(details); + } + + _model?.BasicAck(details.Args.DeliveryTag, false); + } + } + catch (Exception e) + { + _logger.LogError(e, "An error occured while processing a message queue"); + queue.Clear(); + _queues.Remove(key); + _onError?.Invoke(); + } + } + + public void SetChannelOptions(IModel model, Action onError) + { + _model = model; + _onError = onError; + } + } +} diff --git a/SimpleRabbit.NetCore.Dispatcher/Model/ModelDetails.cs b/SimpleRabbit.NetCore.Dispatcher/Model/ModelDetails.cs new file mode 100644 index 0000000..8eac2c9 --- /dev/null +++ b/SimpleRabbit.NetCore.Dispatcher/Model/ModelDetails.cs @@ -0,0 +1,10 @@ +using RabbitMQ.Client.Events; + +namespace SimpleRabbit.NetCore.Dispatcher +{ + public class ModelDetails + { + public T Message; + public BasicDeliverEventArgs Args; + } +} \ No newline at end of file diff --git a/SimpleRabbit.NetCore.Dispatcher/SimpleRabbit.NetCore.Dispatcher.csproj b/SimpleRabbit.NetCore.Dispatcher/SimpleRabbit.NetCore.Dispatcher.csproj new file mode 100644 index 0000000..f2759bd --- /dev/null +++ b/SimpleRabbit.NetCore.Dispatcher/SimpleRabbit.NetCore.Dispatcher.csproj @@ -0,0 +1,11 @@ + + + + netcoreapp2.1 + + + + + + + diff --git a/SimpleRabbit.NetCore.sln b/SimpleRabbit.NetCore.sln index 8b7b9e8..40a7a8c 100644 --- a/SimpleRabbit.NetCore.sln +++ b/SimpleRabbit.NetCore.sln @@ -5,7 +5,9 @@ VisualStudioVersion = 15.0.27703.2035 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleRabbit.NetCore", "SimpleRabbit.NetCore\SimpleRabbit.NetCore.csproj", "{005D39F6-9FA3-4321-8F50-1B5967EBADBE}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleRabbit.NetCore.Service", "SimpleRabbit.NetCore.Service\SimpleRabbit.NetCore.Service.csproj", "{48551957-A3C9-4FF0-BDAE-66AA85103D48}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleRabbit.NetCore.Service", "SimpleRabbit.NetCore.Service\SimpleRabbit.NetCore.Service.csproj", "{48551957-A3C9-4FF0-BDAE-66AA85103D48}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleRabbit.NetCore.Dispatcher", "SimpleRabbit.NetCore.Dispatcher\SimpleRabbit.NetCore.Dispatcher.csproj", "{B0F9E58D-8E5B-48DF-A905-DB3FE8952FF6}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -21,6 +23,10 @@ Global {48551957-A3C9-4FF0-BDAE-66AA85103D48}.Debug|Any CPU.Build.0 = Debug|Any CPU {48551957-A3C9-4FF0-BDAE-66AA85103D48}.Release|Any CPU.ActiveCfg = Release|Any CPU {48551957-A3C9-4FF0-BDAE-66AA85103D48}.Release|Any CPU.Build.0 = Release|Any CPU + {B0F9E58D-8E5B-48DF-A905-DB3FE8952FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B0F9E58D-8E5B-48DF-A905-DB3FE8952FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0F9E58D-8E5B-48DF-A905-DB3FE8952FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B0F9E58D-8E5B-48DF-A905-DB3FE8952FF6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE