Skip to content

Commit

Permalink
2459 multi type bulk (#2497)
Browse files Browse the repository at this point in the history
* Add Failing Test

  Issue #2495

* Add Reflections to Handle multiple request types
  • Loading branch information
preardon committed Feb 6, 2023
1 parent 8649f9b commit dc24faf
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
31 changes: 25 additions & 6 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#endregion

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -580,7 +582,7 @@ public async Task PostAsync<T>(T request, bool continueOnCapturedContext = false

foreach (var batch in SplitRequestBatchIntoTypes(requests))
{
var messages = BulkMapMessages(requests);
var messages = MapMessages(batch.Key, batch);

s_logger.LogInformation("Save requests: {RequestType} {AmountOfMessages}", batch.Key, messages.Count());

Expand Down Expand Up @@ -655,7 +657,7 @@ public async Task PostAsync<T>(T request, bool continueOnCapturedContext = false

foreach (var batch in SplitRequestBatchIntoTypes(requests))
{
var messages = BulkMapMessages(batch.ToArray());
var messages = await MapMessagesAsync(batch.Key, batch.ToArray(), cancellationToken);

s_logger.LogInformation("Save requests: {RequestType} {AmountOfMessages}", batch.Key, messages.Count());

Expand All @@ -672,24 +674,41 @@ public async Task PostAsync<T>(T request, bool continueOnCapturedContext = false
return requests.GroupBy(r => r.GetType());
}

private List<Message> BulkMapMessages<T>(IEnumerable<T> requests) where T : class, IRequest
private List<Message> MapMessages(Type requestType, IEnumerable<IRequest> requests)
{
return (List<Message>)GetType()
.GetMethod(nameof(BulkMapMessages), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(requestType)
.Invoke(this, new[] { requests });
}

private Task<List<Message>> MapMessagesAsync(Type requestType, IEnumerable<IRequest> requests, CancellationToken cancellationToken)
{
var parameters = new object[] { requests, cancellationToken };
return (Task<List<Message>>)GetType()
.GetMethod(nameof(BulkMapMessagesAsync), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(requestType)
.Invoke(this, parameters);
}

private List<Message> BulkMapMessages<T>(IEnumerable<IRequest> requests) where T : class, IRequest
{
return requests.Select(r =>
{
var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline<T>();
var message = wrapPipeline.WrapAsync(r).GetAwaiter().GetResult();
var message = wrapPipeline.WrapAsync((T)r).GetAwaiter().GetResult();
AddTelemetryToMessage<T>(message);
return message;
}).ToList();
}

private async Task<List<Message>> BulkMapMessagesAsync<T>(IEnumerable<T> requests, CancellationToken cancellationToken = default) where T : class, IRequest
private async Task<List<Message>> BulkMapMessagesAsync<T>(IEnumerable<IRequest> requests, CancellationToken cancellationToken = default) where T : class, IRequest
{
var messages = new List<Message>();
foreach (var request in requests)
{
var wrapPipeline = _transformPipelineBuilder.BuildWrapPipeline<T>();
var message = await wrapPipeline.WrapAsync(request, cancellationToken);
var message = await wrapPipeline.WrapAsync((T)request, cancellationToken);
AddTelemetryToMessage<T>(message);
messages.Add(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
using Paramore.Brighter.Core.Tests.MessageDispatch.TestDoubles;
using Polly;
using Polly.Registry;
using Xunit;
Expand All @@ -19,8 +20,10 @@ public class CommandProcessorBulkDepositPostTestsAsync: IDisposable
private readonly CommandProcessor _commandProcessor;
private readonly MyCommand _myCommand = new MyCommand();
private readonly MyCommand _myCommand2 = new MyCommand();
private readonly MyEvent _myEvent = new MyEvent();
private readonly Message _message;
private readonly Message _message2;
private readonly Message _message3;
private readonly FakeOutboxSync _fakeOutboxSync;
private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation;

Expand All @@ -33,6 +36,7 @@ public CommandProcessorBulkDepositPostTestsAsync()
_fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation();

var topic = "MyCommand";
var eventTopic = "MyEvent";
_message = new Message(
new MessageHeader(_myCommand.Id, topic, MessageType.MT_COMMAND),
new MessageBody(JsonSerializer.Serialize(_myCommand, JsonSerialisationOptions.Options))
Expand All @@ -42,9 +46,23 @@ public CommandProcessorBulkDepositPostTestsAsync()
new MessageHeader(_myCommand2.Id, topic, MessageType.MT_COMMAND),
new MessageBody(JsonSerializer.Serialize(_myCommand2, JsonSerialisationOptions.Options))
);

_message3 = new Message(
new MessageHeader(_myEvent.Id, eventTopic, MessageType.MT_EVENT),
new MessageBody(JsonSerializer.Serialize(_myEvent, JsonSerialisationOptions.Options))
);

var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper()));
var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((type) =>
{
if (type.Equals(typeof(MyCommandMessageMapper)))
return new MyCommandMessageMapper();
else
{
return new MyEventMessageMapper();
}
}));
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

var retryPolicy = Policy
.Handle<Exception>()
Expand All @@ -68,7 +86,8 @@ public CommandProcessorBulkDepositPostTestsAsync()
public async Task When_depositing_a_message_in_the_outbox()
{
//act
var postedMessageIds = await _commandProcessor.DepositPostAsync(new []{_myCommand, _myCommand2});
var requests = new List<IRequest> {_myCommand, _myCommand2, _myEvent } ;
var postedMessageIds = await _commandProcessor.DepositPostAsync(requests);

//assert
//message should not be posted
Expand All @@ -83,6 +102,11 @@ public async Task When_depositing_a_message_in_the_outbox()
var depositedPost2 = _fakeOutboxSync
.OutstandingMessages(0)
.SingleOrDefault(msg => msg.Id == _message2.Id);

//message should be in the store
var depositedPost3 = _fakeOutboxSync
.OutstandingMessages(0)
.SingleOrDefault(msg => msg.Id == _message3.Id);

depositedPost.Should().NotBeNull();

Expand All @@ -97,6 +121,12 @@ public async Task When_depositing_a_message_in_the_outbox()
depositedPost2.Body.Value.Should().Be(_message2.Body.Value);
depositedPost2.Header.Topic.Should().Be(_message2.Header.Topic);
depositedPost2.Header.MessageType.Should().Be(_message2.Header.MessageType);

//message should correspond to the command
depositedPost3.Id.Should().Be(_message3.Id);
depositedPost3.Body.Value.Should().Be(_message3.Body.Value);
depositedPost3.Header.Topic.Should().Be(_message3.Header.Topic);
depositedPost3.Header.MessageType.Should().Be(_message3.Header.MessageType);
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
using Paramore.Brighter.Core.Tests.MessageDispatch.TestDoubles;
using Polly;
using Polly.Registry;
using Xunit;
Expand All @@ -19,8 +18,10 @@ public class CommandProcessorBulkDepositPostTests : IDisposable
private readonly CommandProcessor _commandProcessor;
private readonly MyCommand _myCommand = new MyCommand();
private readonly MyCommand _myCommand2 = new MyCommand();
private readonly MyEvent _myEvent = new MyEvent();
private readonly Message _message;
private readonly Message _message2;
private readonly Message _message3;
private readonly FakeOutboxSync _fakeOutbox;
private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation;

Expand All @@ -32,6 +33,7 @@ public CommandProcessorBulkDepositPostTests()
_fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation();

const string topic = "MyCommand";
var eventTopic = "MyEvent";
_message = new Message(
new MessageHeader(_myCommand.Id, topic, MessageType.MT_COMMAND),
new MessageBody(JsonSerializer.Serialize(_myCommand, JsonSerialisationOptions.Options))
Expand All @@ -41,9 +43,23 @@ public CommandProcessorBulkDepositPostTests()
new MessageHeader(_myCommand2.Id, topic, MessageType.MT_COMMAND),
new MessageBody(JsonSerializer.Serialize(_myCommand2, JsonSerialisationOptions.Options))
);

_message3 = new Message(
new MessageHeader(_myEvent.Id, eventTopic, MessageType.MT_EVENT),
new MessageBody(JsonSerializer.Serialize(_myEvent, JsonSerialisationOptions.Options))
);

var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper()));
var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((type) =>
{
if (type.Equals(typeof(MyCommandMessageMapper)))
return new MyCommandMessageMapper();
else
{
return new MyEventMessageMapper();
}
}));
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

var retryPolicy = Policy
.Handle<Exception>()
Expand All @@ -66,7 +82,8 @@ public CommandProcessorBulkDepositPostTests()
public void When_depositing_a_message_in_the_outbox()
{
//act
var postedMessageId = _commandProcessor.DepositPost(new []{_myCommand, _myCommand2});
var requests = new List<IRequest> {_myCommand, _myCommand2, _myEvent } ;
var postedMessageId = _commandProcessor.DepositPost(requests);

//assert

Expand All @@ -86,9 +103,19 @@ public void When_depositing_a_message_in_the_outbox()
depositedPost2.Header.Topic.Should().Be(_message2.Header.Topic);
depositedPost2.Header.MessageType.Should().Be(_message2.Header.MessageType);


var depositedPost3 = _fakeOutbox
.OutstandingMessages(0)
.SingleOrDefault(msg => msg.Id == _message3.Id);
//message should correspond to the command
depositedPost3.Id.Should().Be(_message3.Id);
depositedPost3.Body.Value.Should().Be(_message3.Body.Value);
depositedPost3.Header.Topic.Should().Be(_message3.Header.Topic);
depositedPost3.Header.MessageType.Should().Be(_message3.Header.MessageType);

//message should be marked as outstanding if not sent
var outstandingMessages = _fakeOutbox.OutstandingMessages(0);
outstandingMessages.Count().Should().Be(2);
outstandingMessages.Count().Should().Be(3);
}

public void Dispose()
Expand Down

0 comments on commit dc24faf

Please sign in to comment.