-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Program.cs
141 lines (118 loc) · 4.52 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using HouseofCat.RabbitMQ;
using HouseofCat.RabbitMQ.Services;
using HouseofCat.Utilities.Extensions;
using HouseofCat.Utilities.Helpers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using RabbitMQ.ConsumerDataflowService;
using System.Text;
var loggerFactory = LogHelpers.CreateConsoleLoggerFactory(LogLevel.Information);
LogHelpers.LoggerFactory = loggerFactory;
var logger = loggerFactory.CreateLogger<Program>();
var logMessage = false;
var builder = WebApplication.CreateBuilder(args);
var configuration = new ConfigurationBuilder()
.SetBasePath(AppContext.BaseDirectory)
.AddJsonFile("appsettings.json")
.Build();
builder.Services.AddOpenTelemetryExporter(configuration);
using var app = builder.Build();
var rabbitService = await Shared.SetupRabbitServiceAsync(loggerFactory, "RabbitMQ.ConsumerDataflows.json");
var dataflowService = new ConsumerDataflowService<CustomWorkState>(rabbitService, "TestConsumer");
// Manually modify the internal Dataflow.
dataflowService.Dataflow.WithCreateSendMessage(
async (state) =>
{
var message = new Message
{
Exchange = "",
RoutingKey = state.ReceivedMessage?.Message?.RoutingKey ?? "TestQueue",
Body = Encoding.UTF8.GetBytes("New Secret Message"),
Metadata = new Metadata
{
PayloadId = Guid.NewGuid().ToString(),
},
ParentSpanContext = state.WorkflowSpan?.Context,
};
await rabbitService.ComcryptAsync(message);
state.SendMessage = message;
return state;
});
// Add custom step to Dataflow using Service helper methods.
dataflowService.AddStep(
"write_message_to_log",
(state) =>
{
var message = Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span);
if (message == "throw")
{
throw new Exception("Throwing an exception!");
}
if (logMessage)
{ logger.LogInformation(message); }
return state;
});
// Add finalization step to Dataflow using Service helper method.
dataflowService.AddFinalization(
(state) =>
{
if (logMessage)
{ logger.LogInformation("Finalization Step!"); }
state.ReceivedMessage?.AckMessage();
});
// Add error handling to Dataflow using Service helper method.
dataflowService.AddErrorHandling(
async (state) =>
{
logger.LogError(state?.EDI?.SourceException, "Error Step!");
// First, check if DLQ is configured in QueueArgs.
// Second, check if ErrorQueue is set in Options.
// Lastly, decide if you want to Nack with requeue, or anything else.
if (dataflowService.Options.RejectOnError())
{
state.ReceivedMessage?.RejectMessage(requeue: false);
}
else if (!string.IsNullOrEmpty(dataflowService.Options.ErrorQueueName))
{
// If type is currently an IMessage, republish with new RoutingKey.
if (state.ReceivedMessage.Message is not null)
{
state.ReceivedMessage.Message.RoutingKey = dataflowService.Options.ErrorQueueName;
await rabbitService.Publisher.QueueMessageAsync(state.ReceivedMessage.Message);
}
else
{
await rabbitService.Publisher.PublishAsync(
exchangeName: "",
routingKey: dataflowService.Options.ErrorQueueName,
body: state.ReceivedMessage.Body,
headers: state.ReceivedMessage.Properties.Headers,
messageId: Guid.NewGuid().ToString(),
deliveryMode: 2,
mandatory: false);
}
// Don't forget to Ack the original message when sending it to a different Queue.
state.ReceivedMessage?.AckMessage();
}
else
{
state.ReceivedMessage?.NackMessage(requeue: true);
}
});
await dataflowService.StartAsync();
app.Lifetime.ApplicationStarted.Register(
() =>
{
logger.LogInformation("Listening for Messages! Press CTRL+C to initiate graceful shutdown and stop consumer...");
});
app.Lifetime.ApplicationStopping.Register(
async () =>
{
logger.LogInformation("ConsumerDataflowService stopping...");
await dataflowService.StopAsync(
immediate: false,
shutdownService: true);
logger.LogInformation("All stopped! Press return to exit...");
});
await app.RunAsync();