/
When_sending_databus_properties_with_unobtrusive.cs
102 lines (87 loc) · 3.87 KB
/
When_sending_databus_properties_with_unobtrusive.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
namespace NServiceBus.AcceptanceTests.DataBus
{
using System;
using System.IO;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using MessageMutator;
using NUnit.Framework;
public class When_sending_databus_properties_with_unobtrusive : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly()
{
var payloadToSend = new byte[PayloadSize];
var context = await Scenario.Define<Context>()
.WithEndpoint<Sender>(b => b.When(session => session.Send(new MyMessageWithLargePayload
{
Payload = payloadToSend
})))
.WithEndpoint<Receiver>()
.Done(c => c.ReceivedPayload != null)
.Run();
Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus");
}
const int PayloadSize = 500;
public class Context : ScenarioContext
{
public byte[] ReceivedPayload { get; set; }
}
public class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(builder =>
{
builder.Conventions()
.DefiningCommandsAs(t => t.Namespace != null && t.FullName == typeof(MyMessageWithLargePayload).FullName)
.DefiningDataBusPropertiesAs(t => t.Name.Contains("Payload"));
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus>().BasePath(basePath);
builder.ConfigureTransport().Routing().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
}).ExcludeType<MyMessageWithLargePayload>(); // remove that type from assembly scanning to simulate what would happen with true unobtrusive mode
}
}
public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(builder =>
{
builder.Conventions()
.DefiningCommandsAs(t => t.Namespace != null && t.FullName == typeof(MyMessageWithLargePayload).FullName)
.DefiningDataBusPropertiesAs(t => t.Name.Contains("Payload"));
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus>().BasePath(basePath);
builder.RegisterMessageMutator(new Mutator());
});
}
public class MyMessageHandler : IHandleMessages<MyMessageWithLargePayload>
{
public Context Context { get; set; }
public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHandlerContext context)
{
Context.ReceivedPayload = messageWithLargePayload.Payload;
return Task.FromResult(0);
}
}
}
public class Mutator : IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
if (context.Body.Length > PayloadSize)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transer the payload.");
}
return Task.FromResult(0);
}
}
public class MyMessageWithLargePayload
{
public byte[] Payload { get; set; }
}
}
}