-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
batchingReceiver.ts
432 lines (399 loc) · 16.8 KB
/
batchingReceiver.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import * as log from "../log";
import { Constants, translate, MessagingError } from "@azure/amqp-common";
import { ReceiverEvents, EventContext, OnAmqpEvent, SessionEvents, AmqpError } from "rhea-promise";
import { ServiceBusMessage } from "../serviceBusMessage";
import {
MessageReceiver,
ReceiveOptions,
ReceiverType,
PromiseLike,
OnAmqpEventAsPromise
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
/**
* Describes the batching receiver where the user can receive a specified number of messages for
* a predefined time.
* @internal
* @class BatchingReceiver
* @extends MessageReceiver
*/
export class BatchingReceiver extends MessageReceiver {
/**
* @property {boolean} isReceivingMessages Indicates whether the link is actively receiving
* messages. Default: false.
*/
isReceivingMessages: boolean = false;
/**
* @property {AmqpError | Error | undefined} detachedError Error that occured when receiver
* got detached. Not applicable when onReceiveError is called.
* Default: undefined.
*/
private detachedError: AmqpError | Error | undefined = undefined;
/**
* Instantiate a new BatchingReceiver.
*
* @constructor
* @param {ClientEntityContext} context The client entity context.
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
*/
constructor(context: ClientEntityContext, options?: ReceiveOptions) {
super(context, ReceiverType.batching, options);
this.newMessageWaitTimeoutInSeconds = 1;
}
/**
* Clear the token renewal timer and set the `detachedError` property.
* @param {AmqpError | Error} [receiverError] The receiver error if any.
* @returns {Promise<void>} Promise<void>.
*/
async onDetached(receiverError?: AmqpError | Error): Promise<void> {
// Clears the token renewal timer. Closes the link and its session if they are open.
await this._closeLink(this._receiver);
this.detachedError = receiverError;
}
/**
* Receives a batch of messages from a ServiceBus Queue/Topic.
* @param maxMessageCount The maximum number of messages to receive.
* @param idleTimeoutInSeconds The maximum wait time in seconds for which the Receiver
* should wait to receive the first message. If no message is received by this time,
* the returned promise gets resolved to an empty array.
* @returns {Promise<ServiceBusMessage[]>} A promise that resolves with an array of Message objects.
*/
receive(maxMessageCount: number, idleTimeoutInSeconds?: number): Promise<ServiceBusMessage[]> {
throwErrorIfConnectionClosed(this._context.namespace);
if (idleTimeoutInSeconds == null) {
idleTimeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
const brokeredMessages: ServiceBusMessage[] = [];
this.isReceivingMessages = true;
return new Promise<ServiceBusMessage[]>((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;
const onSessionError: OnAmqpEvent = (context: EventContext) => {
this.isReceivingMessages = false;
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
const sessionError = context.session && context.session.error;
let error = new MessagingError("An error occuured while receiving messages.");
if (sessionError) {
error = translate(sessionError);
log.error(
"[%s] 'session_close' event occurred for Receiver '%s' received an error:\n%O",
this._context.namespace.connectionId,
this.name,
error
);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);
};
// Final action to be performed after maxMessageCount is reached or the maxWaitTime is over.
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
if (this.detachedError) {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
this.isReceivingMessages = false;
const err = translate(this.detachedError);
return reject(err);
}
if (this._receiver && this._receiver.credit > 0) {
log.batching(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
this.isReceivingMessages = false;
log.batching(
"[%s] Receiver '%s': Resolving receiveMessages() with %d messages.",
this._context.namespace.connectionId,
this.name,
brokeredMessages.length
);
resolve(brokeredMessages);
}
};
// Action to be performed on the "message" event.
const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => {
this.resetTimerOnNewMessageReceived();
try {
const data: ServiceBusMessage = new ServiceBusMessage(
this._context,
context.message!,
context.delivery!,
true
);
if (brokeredMessages.length < maxMessageCount) {
brokeredMessages.push(data);
}
} catch (err) {
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
log.error(
"[%s] Receiver '%s' received an error while converting AmqpMessage to ServiceBusMessage:\n%O",
this._context.namespace.connectionId,
this.name,
errObj
);
reject(errObj);
}
if (brokeredMessages.length === maxMessageCount) {
finalAction();
}
};
const onSessionClose: OnAmqpEventAsPromise = async (context: EventContext) => {
try {
this.isReceivingMessages = false;
const sessionError = context.session && context.session.error;
if (sessionError) {
log.error(
"[%s] 'session_close' event occurred for receiver '%s'. The associated error is: %O",
this._context.namespace.connectionId,
this.name,
sessionError
);
}
} catch (err) {
log.error(
"[%s] Receiver '%s' error in onSessionClose handler:\n%O",
this._context.namespace.connectionId,
this.name,
translate(err)
);
}
};
// Action to be performed on the "receiver_drained" event.
const onReceiveDrain: OnAmqpEvent = () => {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
this._receiver.drain = false;
}
this.isReceivingMessages = false;
log.batching(
"[%s] Receiver '%s' drained. Resolving receiveMessages() with %d messages.",
this._context.namespace.connectionId,
this.name,
brokeredMessages.length
);
resolve(brokeredMessages);
};
const onReceiveClose: OnAmqpEventAsPromise = async (context: EventContext) => {
try {
this.isReceivingMessages = false;
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
log.error(
"[%s] 'receiver_close' event occurred. The associated error is: %O",
this._context.namespace.connectionId,
receiverError
);
}
} catch (err) {
log.error(
"[%s] Receiver '%s' error in onClose handler:\n%O",
this._context.namespace.connectionId,
this.name,
translate(err)
);
}
};
// Action to be taken when an error is received.
const onReceiveError: OnAmqpEvent = (context: EventContext) => {
this.isReceivingMessages = false;
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
const receiverError = context.receiver && context.receiver.error;
let error = new MessagingError("An error occuured while receiving messages.");
if (receiverError) {
error = translate(receiverError);
log.error(
"[%s] Receiver '%s' received an error:\n%O",
this._context.namespace.connectionId,
this.name,
error
);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
reject(error);
};
/**
* Resets the timer when a new message is received. If no messages were received for
* `newMessageWaitTimeoutInSeconds`, the messages received till now are returned. The
* receiver link stays open for the next receive call, but doesnt receive messages until then
*/
this.resetTimerOnNewMessageReceived = () => {
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
if (this.newMessageWaitTimeoutInSeconds) {
this._newMessageReceivedTimer = setTimeout(async () => {
const msg =
`BatchingReceiver '${this.name}' did not receive any messages in the last ` +
`${this.newMessageWaitTimeoutInSeconds} seconds. ` +
`Hence ending this batch receive operation.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
finalAction();
}, this.newMessageWaitTimeoutInSeconds * 1000);
}
};
// Action to be performed after the max wait time is over.
const actionAfterWaitTimeout = (): void => {
log.batching(
"[%s] Batching Receiver '%s' max wait time in seconds %d over.",
this._context.namespace.connectionId,
this.name,
idleTimeoutInSeconds
);
return finalAction();
};
const onSettled: OnAmqpEvent = (context: EventContext) => {
const connectionId = this._context.namespace.connectionId;
const delivery = context.delivery;
if (delivery) {
const id = delivery.id;
const state = delivery.remote_state;
const settled = delivery.remote_settled;
log.receiver(
"[%s] Delivery with id %d, remote_settled: %s, remote_state: %o has been " +
"received.",
connectionId,
id,
settled,
state && state.error ? state.error : state
);
if (settled && this._deliveryDispositionMap.has(id)) {
const promise = this._deliveryDispositionMap.get(id) as PromiseLike;
clearTimeout(promise.timer);
log.receiver(
"[%s] Found the delivery with id %d in the map and cleared the timer.",
connectionId,
id
);
const deleteResult = this._deliveryDispositionMap.delete(id);
log.receiver(
"[%s] Successfully deleted the delivery with id %d from the map.",
connectionId,
id,
deleteResult
);
if (state && state.error && (state.error.condition || state.error.description)) {
const error = translate(state.error);
return promise.reject(error);
}
return promise.resolve();
}
}
};
const addCreditAndSetTimer = (reuse?: boolean): void => {
log.batching(
"[%s] Receiver '%s', adding credit for receiving %d messages.",
this._context.namespace.connectionId,
this.name,
maxMessageCount
);
// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, idleTimeoutInSeconds, this.name);
totalWaitTimer = setTimeout(
actionAfterWaitTimeout,
(idleTimeoutInSeconds as number) * 1000
);
// TODO: Disabling this for now. We would want to give the user a decent chance to receive
// the first message and only timeout faster if successive messages from there onwards are
// not received quickly. However, it may be possible that there are no pending messages
// currently on the queue. In that case waiting for idleTimeoutInSeconds would be
// unnecessary.
// There is a management plane API to get runtimeInfo of the Queue which provides
// information about active messages on the Queue and it's sub Queues. However, this adds
// a little complexity. If the first message was delayed due to network latency then there
// are bright chances that the management plane api would receive the same fate.
// It would be better to weigh all the options before making a decision.
// resetTimerOnNewMessageReceived();
};
if (!this.isOpen()) {
log.batching(
"[%s] Receiver '%s', setting max concurrent calls to 0.",
this._context.namespace.connectionId,
this.name
);
// while creating the receiver link for batching receiver the max concurrent calls
// i.e. the credit_window on the link is set to zero. After the link is created
// successfully, we add credit which is the maxMessageCount specified by the user.
this.maxConcurrentCalls = 0;
const rcvrOptions = this._createReceiverOptions(false, {
onMessage: onReceiveMessage,
onError: onReceiveError,
onSessionError: onSessionError,
onSettled: onSettled,
onClose: onReceiveClose,
onSessionClose: onSessionClose
});
this._init(rcvrOptions)
.then(() => {
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
return;
})
.catch(reject);
} else {
addCreditAndSetTimer(true);
this._receiver!.on(ReceiverEvents.message, onReceiveMessage);
this._receiver!.on(ReceiverEvents.receiverError, onReceiveError);
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
this._receiver!.session.on(SessionEvents.sessionError, onSessionError);
}
});
}
/**
* Creates a batching receiver.
* @static
*
* @param {ClientEntityContext} context The connection context.
* @param {ReceiveOptions} [options] Receive options.
*/
static create(context: ClientEntityContext, options?: ReceiveOptions): BatchingReceiver {
throwErrorIfConnectionClosed(context.namespace);
const bReceiver = new BatchingReceiver(context, options);
context.batchingReceiver = bReceiver;
return bReceiver;
}
}