/
streamingReceiver.ts
117 lines (107 loc) · 4.22 KB
/
streamingReceiver.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import {
MessageReceiver,
ReceiveOptions,
OnMessage,
OnError,
ReceiverType
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import * as log from "../log";
import { throwErrorIfConnectionClosed } from "../util/errors";
/**
* Describes the options passed to `registerMessageHandler` method when receiving messages from a
* Queue/Subscription which does not have sessions enabled.
*/
export interface MessageHandlerOptions {
/**
* @property Indicates whether the `complete()` method on the message should automatically be
* called by the sdk after the user provided onMessage handler has been executed.
* Calling `complete()` on a message removes it from the Queue/Subscription.
* - **Default**: `true`.
*/
autoComplete?: boolean;
/**
* @property The maximum duration in seconds until which the lock on the message will be renewed
* by the sdk automatically. This auto renewal stops once the message is settled or once the user
* provided onMessage handler completes ite execution.
*
* - **Default**: `300` seconds (5 minutes).
* - **To disable autolock renewal**, set this to `0`.
*/
maxMessageAutoRenewLockDurationInSeconds?: number;
/**
* @property The maximum number of concurrent calls that the sdk can make to the user's message
* handler. Once this limit has been reached, further messages will not be received until atleast
* one of the calls to the user's message handler has completed.
* - **Default**: `1`.
*/
maxConcurrentCalls?: number;
}
/**
* @internal
* Describes the streaming receiver where the user can receive the message
* by providing handler functions.
* @class StreamingReceiver
* @extends MessageReceiver
*/
export class StreamingReceiver extends MessageReceiver {
/**
* Instantiate a new Streaming receiver for receiving messages with handlers.
*
* @constructor
* @param {ClientEntityContext} context The client entity context.
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
*/
constructor(context: ClientEntityContext, options?: ReceiveOptions) {
super(context, ReceiverType.streaming, options);
this.resetTimerOnNewMessageReceived = () => {
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
if (this.newMessageWaitTimeoutInSeconds) {
this._newMessageReceivedTimer = setTimeout(async () => {
const msg =
`StreamingReceiver '${this.name}' did not receive any messages in ` +
`the last ${this.newMessageWaitTimeoutInSeconds} seconds. ` +
`Hence ending this receive operation.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
await this.close();
}, this.newMessageWaitTimeoutInSeconds * 1000);
}
};
}
/**
* Starts the receiver by establishing an AMQP session and an AMQP receiver link on the session.
*
* @param {OnMessage} onMessage The message handler to receive servicebus messages.
* @param {OnError} onError The error handler to receive an error that occurs while receivin messages.
*/
receive(onMessage: OnMessage, onError: OnError): void {
throwErrorIfConnectionClosed(this._context.namespace);
this._onMessage = onMessage;
this._onError = onError;
if (this._receiver) {
this._receiver.addCredit(this.maxConcurrentCalls);
}
}
/**
* Creates a streaming receiver.
* @static
*
* @param {ClientEntityContext} context The connection context.
* @param {ReceiveOptions} [options] Receive options.
* @return {Promise<StreamingReceiver>} A promise that resolves with an instance of StreamingReceiver.
*/
static async create(
context: ClientEntityContext,
options?: ReceiveOptions
): Promise<StreamingReceiver> {
throwErrorIfConnectionClosed(context.namespace);
if (!options) options = {};
if (options.autoComplete == null) options.autoComplete = true;
const sReceiver = new StreamingReceiver(context, options);
await sReceiver._init();
context.streamingReceiver = sReceiver;
return sReceiver;
}
}