From 632a517fbdf251cac5c682c1c663376fd9c74944 Mon Sep 17 00:00:00 2001 From: HamidTheGeek Date: Mon, 21 Oct 2019 14:37:33 +0100 Subject: [PATCH] Add publish in batch for event bus and use it in refresh data in replication. --- Olive.Aws.EventBus/EventBusQueue.cs | 31 ++++++++++ .../Publish/ExposedType-TDomain.cs | 60 ++++++++++++++----- Olive.EventBus/IOQueue/EventBusExtensions.cs | 13 ++++ Olive.EventBus/IOQueue/IoEventBusQueue.cs | 10 ++++ .../Queue/EventBusQueue.TMessage.cs | 13 ++++ Olive.EventBus/Queue/IEventBusQueue.cs | 7 +++ 6 files changed, 120 insertions(+), 14 deletions(-) diff --git a/Olive.Aws.EventBus/EventBusQueue.cs b/Olive.Aws.EventBus/EventBusQueue.cs index 3484d5a25..9de354942 100644 --- a/Olive.Aws.EventBus/EventBusQueue.cs +++ b/Olive.Aws.EventBus/EventBusQueue.cs @@ -3,6 +3,8 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace Olive.Aws @@ -52,6 +54,35 @@ public async Task Publish(string message) return response.MessageId; } + public async Task> PublishBatch(IEnumerable messages) + { + + var request = new SendMessageBatchRequest + { + QueueUrl = QueueUrl, + }; + + messages.Do(message => + request.Entries.Add(new SendMessageBatchRequestEntry + { + MessageBody = message, + })); + + if (IsFifo) + { + request.Entries.ForEach(message => + { + message.MessageDeduplicationId = + JsonConvert.DeserializeObject(message.MessageBody)["DeduplicationId"]?.ToString(); + message.MessageGroupId = "Default"; + }); + } + + var response = await Client.SendMessageBatchAsync(request); + + return response.Successful.Select(m => m.MessageId); + } + public void Subscribe(Func handler) => new Subscriber(this, handler).Start(); public async Task Pull(int timeoutSeconds = 10) diff --git a/Olive.Entities.Data.Replication/Publish/ExposedType-TDomain.cs b/Olive.Entities.Data.Replication/Publish/ExposedType-TDomain.cs index e3194c7cf..35a710e3a 100644 --- a/Olive.Entities.Data.Replication/Publish/ExposedType-TDomain.cs +++ b/Olive.Entities.Data.Replication/Publish/ExposedType-TDomain.cs @@ -168,31 +168,63 @@ public void ExposeEverything() internal override async Task UploadAll() { - var toUpload = await Context.Current.Database().GetList().ToArray(); + var database = Context.Current.Database(); + var totalCount = await database.Of(typeof(TDomain)).Count(); var log = Log.For(this); - log.Warning("Uploading " + toUpload.Count() + " records of " + typeof(TDomain).FullName + " to the queue..."); + log.Warning($"Uploading {totalCount} records of {typeof(TDomain).FullName} to the queue..."); - foreach (var item in toUpload) + + var pageSize = 10000; + var allPages = Enumerable.Range(0, (int)Math.Ceiling(totalCount / (decimal)pageSize)); + + //using (var scope = database.CreateTransactionScope()) + //{ + await allPages.ForEachAsync(10, async (pageIndex) => { - IEventBusMessage message; - try { message = await ToMessage(item); } - catch (Exception ex) - { - log.Error(ex, "Failed to create an event bus message for " + item.GetType().FullName + " with ID of " + item.GetId()); - continue; - } + var query = database.Of(typeof(TDomain)); - try { await Queue.Publish(message); } + query.OrderBy("ID"); + query.PageSize = pageSize; + query.PageStartIndex = pageIndex * pageSize; + + await UploadPage(await query.GetList()); + }); + + // scope.Complete(); + //} + + + log.Warning($"Finished uploading {totalCount} records of {typeof(TDomain).FullName} to the queue."); + } + + async Task UploadPage(IEnumerable toUpload) + { + var list = await toUpload.SelectAsync(GetMessage) + .ExceptNull(); + + await list.Chop(10).DoAsync(async (messages, _) => + { + try { await Queue.PublishBatch(messages); } catch (Exception ex) { - log.Error(ex, "Failed to publish an event bus message for " + item.GetType().FullName + " with ID of " + item.GetId()); - continue; + Log.For(this).Error(ex, + $"Failed to publish the event bus messages for a batch of {typeof(TDomain).FullName}"); } + }); + } + + async Task GetMessage(IEntity item) + { + IEventBusMessage result = null; + try { result = await ToMessage(item); } + catch (Exception ex) + { + Log.For(this).Error(ex, $"Failed to create an event bus message for {item.GetType().FullName} with ID of {item.GetId()}"); } - log.Warning("Finished uploading " + toUpload.Count() + " records of " + typeof(TDomain).FullName + " to the queue."); + return result; } /// diff --git a/Olive.EventBus/IOQueue/EventBusExtensions.cs b/Olive.EventBus/IOQueue/EventBusExtensions.cs index 878af5008..b20f57a50 100644 --- a/Olive.EventBus/IOQueue/EventBusExtensions.cs +++ b/Olive.EventBus/IOQueue/EventBusExtensions.cs @@ -18,6 +18,19 @@ public static Task Publish(this IEventBusQueue queue, IEventBusMessage m return queue.Publish(textMessage); } + /// + /// Publishes the specified events to the current event bus provider. + /// + /// The unique id of the queue item. + public static Task> PublishBatch(this IEventBusQueue queue, IEnumerable messages) + { + var stringMessegas = new List(); + + messages.Do(message => stringMessegas.Add(JsonConvert.SerializeObject(message))); + + return queue.PublishBatch(stringMessegas); + } + /// /// Attaches an event handler to the specified queue message type. /// diff --git a/Olive.EventBus/IOQueue/IoEventBusQueue.cs b/Olive.EventBus/IOQueue/IoEventBusQueue.cs index c27ff9f3b..1fce8ad1a 100644 --- a/Olive.EventBus/IOQueue/IoEventBusQueue.cs +++ b/Olive.EventBus/IOQueue/IoEventBusQueue.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; @@ -30,6 +31,15 @@ public async Task Publish(string message) return path.Name; } + public async Task> PublishBatch(IEnumerable messages) + { + var result = new List(); + + await messages.DoAsync(async (m, _) => result.Add(await Publish(m))); + + return result; + } + public async Task Pull(int timeoutSeconds = 10) { var item = await IOSubscriber.FetchOnce(Folder); diff --git a/Olive.EventBus/Queue/EventBusQueue.TMessage.cs b/Olive.EventBus/Queue/EventBusQueue.TMessage.cs index 9f68e5888..b4a2257f8 100644 --- a/Olive.EventBus/Queue/EventBusQueue.TMessage.cs +++ b/Olive.EventBus/Queue/EventBusQueue.TMessage.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Olive @@ -13,6 +14,8 @@ public class EventBusQueue : IEventBusQueue where TMessage : IEventBus public EventBusQueue(IEventBusQueue queue) => Queue = queue; Task IEventBusQueue.Publish(string message) => Queue.Publish(message); + Task> IEventBusQueue.PublishBatch(IEnumerable messages) + => Queue.PublishBatch(messages); void IEventBusQueue.Subscribe(Func @handler) => Queue.Subscribe(handler); Task IEventBusQueue.Pull(int timeout) => Queue.Pull(timeout); Task IEventBusQueue.Purge() => Queue.Purge(); @@ -34,6 +37,16 @@ public Task Publish(TMessage message) return this.Publish((IEventBusMessage)message); } + + /// + /// Publishes the specified events to the current event bus provider. + /// + /// The unique id of the queue item. + public Task> PublishBatch(IEnumerable messages) + { + return this.PublishBatch((dynamic) messages as IEnumerable); + } + /// /// Attaches an event handler to the specified queue message type. /// diff --git a/Olive.EventBus/Queue/IEventBusQueue.cs b/Olive.EventBus/Queue/IEventBusQueue.cs index 79050b4ae..42e8d3298 100644 --- a/Olive.EventBus/Queue/IEventBusQueue.cs +++ b/Olive.EventBus/Queue/IEventBusQueue.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Olive @@ -11,6 +12,12 @@ public interface IEventBusQueue /// The unique id of the queue item. Task Publish(string message); + /// + /// Publishes the specified events to the current event bus provider. + /// + /// The unique id of the queue item. + Task> PublishBatch(IEnumerable messages); + /// /// Attaches an event handler to the specified queue message type. ///