-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
serviceBusMessageBatch.ts
303 lines (278 loc) · 9.64 KB
/
serviceBusMessageBatch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { ServiceBusMessage, toRheaMessage } from "./serviceBusMessage";
import {
errorInvalidMessageTypeSingle,
throwIfNotValidServiceBusMessage,
throwTypeErrorIfParameterMissing,
} from "./util/errors";
import { ConnectionContext } from "./connectionContext";
import {
MessageAnnotations,
messageProperties as RheaMessagePropertiesList,
message as RheaMessageUtil,
Message as RheaMessage,
} from "rhea-promise";
import { SpanContext } from "@azure/core-tracing";
import { convertTryAddOptionsForCompatibility, instrumentMessage } from "./diagnostics/tracing";
import { TryAddOptions } from "./modelsToBeSharedWithEventHubs";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { defaultDataTransformer } from "./dataTransformer";
/**
* @internal
* The amount of bytes to reserve as overhead for a small message.
*/
const smallMessageOverhead = 5;
/**
* @internal
* The amount of bytes to reserve as overhead for a large message.
*/
const largeMessageOverhead = 8;
/**
* @internal
* The maximum number of bytes that a message may be to be considered small.
*/
const smallMessageMaxBytes = 255;
/**
* A batch of messages that you can create using the {@link createBatch} method.
*
*/
export interface ServiceBusMessageBatch {
/**
* Size of the batch in bytes after the events added to it have been encoded into a single AMQP
* message.
* @readonly
*/
readonly sizeInBytes: number;
/**
* Number of messages added to the batch.
* @readonly
*/
readonly count: number;
/**
* The maximum size of the batch, in bytes. The `tryAddMessage` function on the batch will return `false`
* if the message being added causes the size of the batch to exceed this limit. Use the `createMessageBatch()` method on
* the `Sender` to set the maxSizeInBytes.
* @readonly
*/
readonly maxSizeInBytes: number;
/**
* Adds a message to the batch if permitted by the batch's size limit.
* **NOTE**: Always remember to check the return value of this method, before calling it again
* for the next event.
*
* @param message - The message to add to the batch.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
tryAddMessage(
message: ServiceBusMessage | AmqpAnnotatedMessage,
options?: TryAddOptions
): boolean;
/**
* The AMQP message containing encoded events that were added to the batch.
* Used internally by the `sendBatch()` method on the `Sender`.
* This is not meant for the user to use directly.
*
* @readonly
* @internal
* @hidden
*/
_generateMessage(): Buffer;
/**
* Gets the "message" span contexts that were created when adding events to the batch.
* Used internally by the `sendBatch()` method to set up the right spans in traces if tracing is enabled.
* @internal
* @hidden
*/
readonly _messageSpanContexts: SpanContext[];
}
/**
* An internal class representing a batch of messages which can be used to send messages to Service Bus.
*
* @internal
*/
export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
/**
* Current size of the batch in bytes.
*/
private _sizeInBytes: number;
/**
* Encoded amqp messages.
*/
private _encodedMessages: Buffer[] = [];
/**
* List of 'message' span contexts.
*/
private _spanContexts: SpanContext[] = [];
/**
* ServiceBusMessageBatch should not be constructed using `new ServiceBusMessageBatch()`
* Use the `createBatch()` method on your `Sender` instead.
* @internal
* @hidden
*/
constructor(private _context: ConnectionContext, private _maxSizeInBytes: number) {
this._sizeInBytes = 0;
this._batchMessageProperties = {};
}
/**
* The maximum size of the batch, in bytes.
* @readonly
*/
get maxSizeInBytes(): number {
return this._maxSizeInBytes;
}
/**
* Size of the `ServiceBusMessageBatch` instance after the messages added to it have been
* encoded into a single AMQP message.
* @readonly
*/
get sizeInBytes(): number {
return this._sizeInBytes;
}
/**
* Number of messages in the `ServiceBusMessageBatch` instance.
* @readonly
*/
get count(): number {
return this._encodedMessages.length;
}
/**
* Gets the "message" span contexts that were created when adding messages to the batch.
* @internal
* @hidden
*/
get _messageSpanContexts(): SpanContext[] {
return this._spanContexts;
}
/**
* Generates an AMQP message that contains the provided encoded messages and annotations.
*
* @param encodedMessages - The already encoded messages to include in the AMQP batch.
* @param annotations - The message annotations to set on the batch.
* @param applicationProperties - The application properties to set on the batch.
* @param messageProperties - The message properties to set on the batch.
*/
private _generateBatch(
encodedMessages: Buffer[],
annotations?: MessageAnnotations,
applicationProperties?: { [key: string]: any },
messageProperties?: { [key: string]: string }
): Buffer {
const batchEnvelope: RheaMessage = {
body: RheaMessageUtil.data_sections(encodedMessages),
message_annotations: annotations,
application_properties: applicationProperties,
};
if (messageProperties) {
for (const prop of RheaMessagePropertiesList) {
if ((messageProperties as any)[prop]) {
(batchEnvelope as any)[prop] = (messageProperties as any)[prop];
}
}
}
return RheaMessageUtil.encode(batchEnvelope);
}
/**
* Represents the single AMQP message which is the result of encoding all the events
* added into the `ServiceBusMessageBatch` instance.
*
* This is not meant for the user to use directly.
*
* When the `ServiceBusMessageBatch` instance is passed to the `sendBatch()` method on the `Sender`,
* this single batched AMQP message is what gets sent over the wire to the service.
* @readonly
*/
_generateMessage(): Buffer {
return this._generateBatch(
this._encodedMessages,
this._batchAnnotations,
this._batchApplicationProperties,
this._batchMessageProperties
);
}
/**
* The message annotations to apply on the batch envelope.
* This will reflect the message annotations on the first message
* that was added to the batch.
*/
private _batchAnnotations?: MessageAnnotations;
/**
* The message properties to apply on the batch envelope.
* This will reflect the message properties on the first message
* that was added to the batch.
*/
private _batchMessageProperties?: { [key: string]: string };
/**
* The application properties to apply on the batch envelope.
* This will reflect the application properties on the first message
* that was added to the batch.
*/
private _batchApplicationProperties?: { [key: string]: any };
/**
* Tries to add a message to the batch if permitted by the batch's size limit.
* **NOTE**: Always remember to check the return value of this method, before calling it again
* for the next message.
*
* @param originalMessage - An individual service bus message.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
public tryAddMessage(
originalMessage: ServiceBusMessage | AmqpAnnotatedMessage,
options: TryAddOptions = {}
): boolean {
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", originalMessage);
throwIfNotValidServiceBusMessage(originalMessage, errorInvalidMessageTypeSingle);
options = convertTryAddOptionsForCompatibility(options);
const { message, spanContext } = instrumentMessage(
originalMessage,
options,
this._context.config.entityPath!,
this._context.config.host
);
// Convert ServiceBusMessage to AmqpMessage.
const amqpMessage = toRheaMessage(message, defaultDataTransformer);
const encodedMessage = RheaMessageUtil.encode(amqpMessage);
let currentSize = this._sizeInBytes;
// The first time an event is added, we need to calculate
// the overhead of creating an AMQP batch, including the
// message_annotations, application_properties and message_properties
// that are taken from the 1st message.
if (this.count === 0) {
if (amqpMessage.message_annotations) {
this._batchAnnotations = amqpMessage.message_annotations;
}
if (amqpMessage.application_properties) {
this._batchApplicationProperties = amqpMessage.application_properties;
}
for (const prop of RheaMessagePropertiesList) {
if ((amqpMessage as any)[prop]) {
(this._batchMessageProperties as any)[prop] = (amqpMessage as any)[prop];
}
}
// Figure out the overhead of creating a batch by generating an empty batch
// with the expected batch annotations.
currentSize += this._generateBatch(
[],
this._batchAnnotations,
this._batchApplicationProperties,
this._batchMessageProperties
).length;
}
const messageSize = encodedMessage.length;
const messageOverhead =
messageSize <= smallMessageMaxBytes ? smallMessageOverhead : largeMessageOverhead;
currentSize += messageSize + messageOverhead;
// Check if the size of the batch exceeds the maximum allowed size
// once we add the new event to it.
if (currentSize > this._maxSizeInBytes) {
return false;
}
// The message will fit in the batch, so it is now safe to store it.
this._encodedMessages.push(encodedMessage);
if (spanContext) {
this._spanContexts.push(spanContext);
}
this._sizeInBytes = currentSize;
return true;
}
}