-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
deferral.ts
150 lines (137 loc) · 5.26 KB
/
deferral.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
/**
* This sample demonstrates how the deferMessage() function can be used to defer a message for later processing.
*
* In this sample, we have an application that gets cooking instructions out of order. It uses
* message deferral to defer the instruction that is out of order, and then processes it in order.
*
* See https://docs.microsoft.com/azure/service-bus-messaging/message-deferral to learn about
* message deferral.
*
* @summary Demonstrates how to defer a message for later processing.
*/
import {
ServiceBusClient,
delay,
ProcessErrorArgs,
ServiceBusReceivedMessage,
ServiceBusMessage,
} from "@azure/service-bus";
// Load the .env file if it exists
import * as dotenv from "dotenv";
dotenv.config();
// Define connection string and related Service Bus entity names here
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";
export async function main() {
await sendMessages();
await receiveMessage();
}
// Shuffle and send messages
async function sendMessages() {
const sbClient = new ServiceBusClient(connectionString);
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const data = [
{ step: 1, title: "Shop" },
{ step: 2, title: "Unpack" },
{ step: 3, title: "Prepare" },
{ step: 4, title: "Cook" },
{ step: 5, title: "Eat" },
];
const promises = new Array();
for (let index = 0; index < data.length; index++) {
const message: ServiceBusMessage = {
body: data[index],
subject: "RecipeStep",
contentType: "application/json",
};
// the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
promises.push(
delay(Math.random() * 30).then(async () => {
try {
await sender.sendMessages(message);
console.log("Sent message step:", data[index].step);
} catch (err: any) {
console.log("Error while sending message", err);
}
}),
);
}
// wait until all the send tasks are complete
await Promise.all(promises);
await sender.close();
await sbClient.close();
}
async function receiveMessage() {
const sbClient = new ServiceBusClient(connectionString);
// If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
let receiver = sbClient.createReceiver(queueName);
const deferredSteps = new Map();
let lastProcessedRecipeStep = 0;
try {
const processMessage = async (brokeredMessage: ServiceBusReceivedMessage) => {
if (
brokeredMessage.subject === "RecipeStep" &&
brokeredMessage.contentType === "application/json"
) {
const message = brokeredMessage.body;
// now let's check whether the step we received is the step we expect at this stage of the workflow
if (message.step === lastProcessedRecipeStep + 1) {
console.log("Process received message:", message);
lastProcessedRecipeStep++;
await receiver.completeMessage(brokeredMessage);
} else {
// if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
// the delivery order. We put it aside. To retrieve it later, we remeber its sequence number
const sequenceNumber = brokeredMessage.sequenceNumber;
deferredSteps.set(message.step, sequenceNumber);
console.log("Defer received message:", message);
await receiver.deferMessage(brokeredMessage);
}
} else {
// we dead-letter the message if we don't know what to do with it.
console.log(
"Unknown message received, moving it to dead-letter queue ",
brokeredMessage.body,
);
await receiver.deadLetterMessage(brokeredMessage);
}
};
const processError = async (args: ProcessErrorArgs) => {
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
};
receiver.subscribe(
{ processMessage, processError },
{
autoCompleteMessages: false,
},
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
await delay(10000);
await receiver.close();
console.log("Total number of deferred messages:", deferredSteps.size);
receiver = sbClient.createReceiver(queueName);
// Now we process the deferred messages
while (deferredSteps.size > 0) {
const step = lastProcessedRecipeStep + 1;
const sequenceNumber = deferredSteps.get(step);
const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
if (message) {
console.log("Process deferred message:", message.body);
await receiver.completeMessage(message);
} else {
console.log("No message found for step number ", step);
}
deferredSteps.delete(step);
lastProcessedRecipeStep++;
}
await receiver.close();
} finally {
await sbClient.close();
}
}
main().catch((err) => {
console.log("Deferral Sample - Error occurred: ", err);
process.exit(1);
});