forked from Azure/azure-sdk-for-js
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.ts
288 lines (272 loc) · 11.6 KB
/
sender.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import Long from "long";
import * as log from "./log";
import { MessageSender } from "./core/messageSender";
import { SendableMessageInfo } from "./serviceBusMessage";
import { ClientEntityContext } from "./clientEntityContext";
import {
getSenderClosedErrorMsg,
throwErrorIfConnectionClosed,
throwTypeErrorIfParameterMissing,
throwTypeErrorIfParameterNotLong,
throwTypeErrorIfParameterNotLongArray
} from "./util/errors";
/**
* The Sender class can be used to send messages, schedule messages to be sent at a later time
* and cancel such scheduled messages.
* Use the `createSender` function on the QueueClient or TopicClient to instantiate a Sender.
* The Sender class is an abstraction over the underlying AMQP sender link.
* @class Sender
*/
export class Sender {
/**
* @property Describes the amqp connection context for the Client.
*/
private _context: ClientEntityContext;
/**
* @property Denotes if close() was called on this sender
*/
private _isClosed: boolean = false;
/**
* @internal
* @throws Error if the underlying connection is closed.
*/
constructor(context: ClientEntityContext) {
throwErrorIfConnectionClosed(context.namespace);
this._context = context;
}
private _throwIfSenderOrConnectionClosed(): void {
throwErrorIfConnectionClosed(this._context.namespace);
if (this.isClosed) {
const errorMessage = getSenderClosedErrorMsg(
this._context.entityPath,
this._context.clientType,
this._context.isClosed
);
const error = new Error(errorMessage);
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
}
}
/**
* @property Returns `true` if either the sender or the client that created it has been closed
* @readonly
*/
public get isClosed(): boolean {
return this._isClosed || this._context.isClosed;
}
/**
* Sends the given message after creating an AMQP Sender link if it doesnt already exists.
*
* To send a message to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId`
* and/or `partitionKey` properties respectively on the message.
*
* @param message - Message to send.
* @returns Promise<void>
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while sending messages to the service.
*/
async send(message: SendableMessageInfo): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "message", message);
const sender = MessageSender.create(this._context);
return sender.send(message);
}
/**
* Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP
* Sender link if it doesnt already exists.
*
* - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId`
* and/or `partitionKey` properties respectively on the messages.
* - When doing so, all
* messages in the batch should have the same `sessionId` (if using sessions) and the same
* `parititionKey` (if using paritions).
*
* @param messages - An array of SendableMessageInfo objects to be sent in a Batch message.
* @return Promise<void>
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while sending messages to the service.
*/
async sendBatch(messages: SendableMessageInfo[]): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages);
if (!Array.isArray(messages)) {
messages = [messages];
}
const sender = MessageSender.create(this._context);
return sender.sendBatch(messages);
}
/**
* Schedules given message to appear on Service Bus Queue/Subscription at a later time.
*
* Recently, a bug has been surfaced with the `scheduleMessage()` method.
*
* More reference and details on the bug - https://github.com/Azure/azure-sdk-for-js/issues/6816#issuecomment-574461068
*
* You are affected with the bug in case you are depending on version < 2.0.0 of `@azure/service-bus`.
*
* Version 2.0.0 is not released yet and the fix for this bug will be shipped along with the release.
*
* Until then, please make use of the following workaround in order to leverage `scheduleMessage` functionality.
*
* Workaround
* 1. Import DefaultDataTransformer from "@azure/amqp-common" library.
* NPM Link - https://www.npmjs.com/package/@azure/amqp-common
* - In typescript, `import { DefaultDataTransformer } from "@azure/amqp-common";`
* - In javascript, `const { DefaultDataTransformer } = require("@azure/amqp-common");`
* 2. Update the message body before calling the scheduleMessage() method to send the message as follows
* - Instantiate the data transformer used by the sdk:
* `const dt = new DefaultDataTransformer();`
* - When you need to schedule the message, encode the message body before sending:
* `message.body = dt.encode(message.body);`
*
* @param scheduledEnqueueTimeUtc - The UTC time at which the message should be enqueued.
* @param message - The message that needs to be scheduled.
* @returns Promise<Long> - The sequence number of the message that was scheduled.
* You will need the sequence number if you intend to cancel the scheduling of the message.
* Save the `Long` type as-is in your application without converting to number. Since JavaScript
* only supports 53 bit numbers, converting the `Long` to number will cause loss in precision.
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while scheduling a message.
*/
async scheduleMessage(
scheduledEnqueueTimeUtc: Date,
message: SendableMessageInfo
): Promise<Long> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"scheduledEnqueueTimeUtc",
scheduledEnqueueTimeUtc
);
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "message", message);
const messages = [message];
const result = await this._context.managementClient!.scheduleMessages(
scheduledEnqueueTimeUtc,
messages
);
return result[0];
}
/**
* Schedules given messages to appear on Service Bus Queue/Subscription at a later time.
*
* Recently, a bug has been surfaced with the `scheduleMessages()` method.
*
* More reference and details on the bug - https://github.com/Azure/azure-sdk-for-js/issues/6816#issuecomment-574461068
*
* You are affected with the bug in case you are depending on version < 2.0.0 of `@azure/service-bus`.
*
* Version 2.0.0 is not released yet and the fix for this bug will be shipped along with the release.
*
* Until then, please make use of the following workaround in order to leverage `scheduleMessages` functionality.
*
* Workaround
* 1. Import DefaultDataTransformer from "@azure/amqp-common" library.
* NPM Link - https://www.npmjs.com/package/@azure/amqp-common
* - In typescript, `import { DefaultDataTransformer } from "@azure/amqp-common";`
* - In javascript, `const { DefaultDataTransformer } = require("@azure/amqp-common");`
* 2. Update the message body before calling the scheduleMessages() method to send the message as follows
* - Instantiate the data transformer used by the sdk:
* `const dt = new DefaultDataTransformer();`
* - When you need to schedule the message, encode the message body before sending:
* `message.body = dt.encode(message.body);`
*
* @param scheduledEnqueueTimeUtc - The UTC time at which the messages should be enqueued.
* @param messages - Array of Messages that need to be scheduled.
* @returns Promise<Long[]> - The sequence numbers of messages that were scheduled.
* You will need the sequence number if you intend to cancel the scheduling of the messages.
* Save the `Long` type as-is in your application without converting to number. Since JavaScript
* only supports 53 bit numbers, converting the `Long` to number will cause loss in precision.
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while scheduling messages.
*/
async scheduleMessages(
scheduledEnqueueTimeUtc: Date,
messages: SendableMessageInfo[]
): Promise<Long[]> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"scheduledEnqueueTimeUtc",
scheduledEnqueueTimeUtc
);
throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages);
if (!Array.isArray(messages)) {
messages = [messages];
}
return this._context.managementClient!.scheduleMessages(scheduledEnqueueTimeUtc, messages);
}
/**
* Cancels a message that was scheduled to appear on a ServiceBus Queue/Subscription.
* @param sequenceNumber - The sequence number of the message to be cancelled.
* @returns Promise<void>
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while canceling a scheduled message.
*/
async cancelScheduledMessage(sequenceNumber: Long): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"sequenceNumber",
sequenceNumber
);
throwTypeErrorIfParameterNotLong(
this._context.namespace.connectionId,
"sequenceNumber",
sequenceNumber
);
return this._context.managementClient!.cancelScheduledMessages([sequenceNumber]);
}
/**
* Cancels multiple messages that were scheduled to appear on a ServiceBus Queue/Subscription.
* @param sequenceNumbers - An Array of sequence numbers of the messages to be cancelled.
* @returns Promise<void>
* @throws Error if the underlying connection, client or sender is closed.
* @throws MessagingError if the service returns an error while canceling scheduled messages.
*/
async cancelScheduledMessages(sequenceNumbers: Long[]): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"sequenceNumbers",
sequenceNumbers
);
if (!Array.isArray(sequenceNumbers)) {
sequenceNumbers = [sequenceNumbers];
}
throwTypeErrorIfParameterNotLongArray(
this._context.namespace.connectionId,
"sequenceNumbers",
sequenceNumbers
);
return this._context.managementClient!.cancelScheduledMessages(sequenceNumbers);
}
/**
* Closes the underlying AMQP sender link.
* Once closed, the sender cannot be used for any further operations.
* Use the `createSender` function on the QueueClient or TopicClient to instantiate a new Sender
*
* @returns {Promise<void>}
*/
async close(): Promise<void> {
try {
this._isClosed = true;
if (
this._context.namespace.connection &&
this._context.namespace.connection.isOpen() &&
this._context.sender
) {
await this._context.sender.close();
}
} catch (err) {
log.error(
"[%s] An error occurred while closing the Sender for %s: %O",
this._context.namespace.connectionId,
this._context.entityPath,
err
);
throw err;
}
}
}