Skip to content

Commit

Permalink
Add publish in batch for event bus and use it in refresh data in repl…
Browse files Browse the repository at this point in the history
…ication.
  • Loading branch information
hamidmayeli committed Oct 21, 2019
1 parent 58cf1a2 commit 632a517
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 14 deletions.
31 changes: 31 additions & 0 deletions Olive.Aws.EventBus/EventBusQueue.cs
Expand Up @@ -3,6 +3,8 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Olive.Aws
Expand Down Expand Up @@ -52,6 +54,35 @@ public async Task<string> Publish(string message)
return response.MessageId;
}

public async Task<IEnumerable<string>> PublishBatch(IEnumerable<string> messages)
{

var request = new SendMessageBatchRequest
{
QueueUrl = QueueUrl,
};

messages.Do(message =>
request.Entries.Add(new SendMessageBatchRequestEntry
{
MessageBody = message,
}));

if (IsFifo)
{
request.Entries.ForEach(message =>
{
message.MessageDeduplicationId =
JsonConvert.DeserializeObject<JObject>(message.MessageBody)["DeduplicationId"]?.ToString();
message.MessageGroupId = "Default";
});
}

var response = await Client.SendMessageBatchAsync(request);

return response.Successful.Select(m => m.MessageId);
}

public void Subscribe(Func<string, Task> handler) => new Subscriber(this, handler).Start();

public async Task<QueueMessageHandle> Pull(int timeoutSeconds = 10)
Expand Down
60 changes: 46 additions & 14 deletions Olive.Entities.Data.Replication/Publish/ExposedType-TDomain.cs
Expand Up @@ -168,31 +168,63 @@ public void ExposeEverything()

internal override async Task UploadAll()
{
var toUpload = await Context.Current.Database().GetList<TDomain>().ToArray();
var database = Context.Current.Database();
var totalCount = await database.Of(typeof(TDomain)).Count();

var log = Log.For(this);

log.Warning("Uploading " + toUpload.Count() + " records of " + typeof(TDomain).FullName + " to the queue...");
log.Warning($"Uploading {totalCount} records of {typeof(TDomain).FullName} to the queue...");

foreach (var item in toUpload)

var pageSize = 10000;
var allPages = Enumerable.Range(0, (int)Math.Ceiling(totalCount / (decimal)pageSize));

//using (var scope = database.CreateTransactionScope())
//{
await allPages.ForEachAsync(10, async (pageIndex) =>
{
IEventBusMessage message;
try { message = await ToMessage(item); }
catch (Exception ex)
{
log.Error(ex, "Failed to create an event bus message for " + item.GetType().FullName + " with ID of " + item.GetId());
continue;
}
var query = database.Of(typeof(TDomain));
try { await Queue.Publish(message); }
query.OrderBy("ID");
query.PageSize = pageSize;
query.PageStartIndex = pageIndex * pageSize;
await UploadPage(await query.GetList());
});

// scope.Complete();
//}


log.Warning($"Finished uploading {totalCount} records of {typeof(TDomain).FullName} to the queue.");
}

async Task UploadPage(IEnumerable<IEntity> toUpload)
{
var list = await toUpload.SelectAsync(GetMessage)
.ExceptNull();

await list.Chop(10).DoAsync(async (messages, _) =>
{
try { await Queue.PublishBatch(messages); }
catch (Exception ex)
{
log.Error(ex, "Failed to publish an event bus message for " + item.GetType().FullName + " with ID of " + item.GetId());
continue;
Log.For(this).Error(ex,
$"Failed to publish the event bus messages for a batch of {typeof(TDomain).FullName}");
}
});
}

async Task<IEventBusMessage> GetMessage(IEntity item)
{
IEventBusMessage result = null;
try { result = await ToMessage(item); }
catch (Exception ex)
{
Log.For(this).Error(ex, $"Failed to create an event bus message for {item.GetType().FullName} with ID of {item.GetId()}");
}

log.Warning("Finished uploading " + toUpload.Count() + " records of " + typeof(TDomain).FullName + " to the queue.");
return result;
}

/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions Olive.EventBus/IOQueue/EventBusExtensions.cs
Expand Up @@ -18,6 +18,19 @@ public static Task<string> Publish(this IEventBusQueue queue, IEventBusMessage m
return queue.Publish(textMessage);
}

/// <summary>
/// Publishes the specified events to the current event bus provider.
/// </summary>
/// <returns>The unique id of the queue item.</returns>
public static Task<IEnumerable<string>> PublishBatch(this IEventBusQueue queue, IEnumerable<IEventBusMessage> messages)
{
var stringMessegas = new List<string>();

messages.Do(message => stringMessegas.Add(JsonConvert.SerializeObject(message)));

return queue.PublishBatch(stringMessegas);
}

/// <summary>
/// Attaches an event handler to the specified queue message type.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions Olive.EventBus/IOQueue/IoEventBusQueue.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -30,6 +31,15 @@ public async Task<string> Publish(string message)
return path.Name;
}

public async Task<IEnumerable<string>> PublishBatch(IEnumerable<string> messages)
{
var result = new List<string>();

await messages.DoAsync(async (m, _) => result.Add(await Publish(m)));

return result;
}

public async Task<QueueMessageHandle> Pull(int timeoutSeconds = 10)
{
var item = await IOSubscriber.FetchOnce(Folder);
Expand Down
13 changes: 13 additions & 0 deletions Olive.EventBus/Queue/EventBusQueue.TMessage.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Olive
Expand All @@ -13,6 +14,8 @@ public class EventBusQueue<TMessage> : IEventBusQueue where TMessage : IEventBus
public EventBusQueue(IEventBusQueue queue) => Queue = queue;

Task<string> IEventBusQueue.Publish(string message) => Queue.Publish(message);
Task<IEnumerable<string>> IEventBusQueue.PublishBatch(IEnumerable<string> messages)
=> Queue.PublishBatch(messages);
void IEventBusQueue.Subscribe(Func<string, Task> @handler) => Queue.Subscribe(handler);
Task<QueueMessageHandle> IEventBusQueue.Pull(int timeout) => Queue.Pull(timeout);
Task IEventBusQueue.Purge() => Queue.Purge();
Expand All @@ -34,6 +37,16 @@ public Task<string> Publish(TMessage message)
return this.Publish((IEventBusMessage)message);
}


/// <summary>
/// Publishes the specified events to the current event bus provider.
/// </summary>
/// <returns>The unique id of the queue item.</returns>
public Task<IEnumerable<string>> PublishBatch(IEnumerable<TMessage> messages)
{
return this.PublishBatch((dynamic) messages as IEnumerable<IEventBusMessage>);
}

/// <summary>
/// Attaches an event handler to the specified queue message type.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions Olive.EventBus/Queue/IEventBusQueue.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Olive
Expand All @@ -11,6 +12,12 @@ public interface IEventBusQueue
/// <returns>The unique id of the queue item.</returns>
Task<string> Publish(string message);

/// <summary>
/// Publishes the specified events to the current event bus provider.
/// </summary>
/// <returns>The unique id of the queue item.</returns>
Task<IEnumerable<string>> PublishBatch(IEnumerable<string> messages);

/// <summary>
/// Attaches an event handler to the specified queue message type.
/// </summary>
Expand Down

0 comments on commit 632a517

Please sign in to comment.