-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
sender.ts
279 lines (261 loc) · 10.7 KB
/
sender.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { EventData } from "./eventData";
import { EventHubSender } from "./eventHubSender";
import { EventHubProducerOptions, SendOptions, CreateBatchOptions } from "./impl/eventHubClient";
import { ConnectionContext } from "./connectionContext";
import { logger, logErrorStackTrace } from "./log";
import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error";
import { EventDataBatch, isEventDataBatch, EventDataBatchImpl } from "./eventDataBatch";
import { getTracer } from "@azure/core-tracing";
import { SpanContext, Span, SpanKind, CanonicalCode, Link } from "@opentelemetry/types";
import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { getParentSpan } from "./util/operationOptions";
/**
* A producer responsible for sending events to an Event Hub.
* To create a producer use the `createProducer()` method on your `EventHubClient`.
* You can pass the below in the `options` when creating a producer.
* - `partitionId` : The identifier of the partition that the producer can be bound to.
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events.
* A simple usage can be `{ "maxRetries": 4 }`.
*
* If `partitionId` is specified when creating a producer, all event data sent using the producer
* will be sent to the specified partition.
* Otherwise, they are automatically routed to an available partition by the Event Hubs service.
*
* Automatic routing of partitions is recommended because:
* - The sending of events will be highly available.
* - The event data will be evenly distributed among all available partitions.
*
* @class
*/
export class EventHubProducer {
/**
* @property Describes the amqp connection context for the Client.
*/
private _context: ConnectionContext;
/**
* @property Denotes if close() was called on this sender
*/
private _isClosed: boolean = false;
private _senderOptions: EventHubProducerOptions;
private _eventHubSender: EventHubSender | undefined;
private _eventHubName: string;
private _endpoint: string;
/**
* @property Returns `true` if either the producer or the client that created it has been closed.
* @readonly
*/
public get isClosed(): boolean {
return this._isClosed || this._context.wasConnectionCloseCalled;
}
/**
* EventHubProducer should not be constructed using `new EventHubProduer()`
* Use the `createProducer()` method on your `EventHubClient` instead.
* @private
* @constructor
* @internal
* @ignore
*/
constructor(
eventHubName: string,
endpoint: string,
context: ConnectionContext,
options?: EventHubProducerOptions
) {
this._context = context;
this._senderOptions = options || {};
const partitionId =
this._senderOptions.partitionId != undefined
? String(this._senderOptions.partitionId)
: undefined;
this._eventHubSender = EventHubSender.create(this._context, partitionId);
this._eventHubName = eventHubName;
this._endpoint = endpoint;
}
/**
* Creates an instance of `EventDataBatch` to which one can add events until the maximum supported size is reached.
* The batch can be passed to the `send()` method of the `EventHubProducer` to be sent to Azure Event Hubs.
* @param options A set of options to configure the behavior of the batch.
* - `partitionKey` : A value that is hashed to produce a partition assignment.
* Not applicable if the `EventHubProducer` was created using a `partitionId`.
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.
* - `abortSignal` : A signal the request to cancel the send operation.
* @returns Promise<EventDataBatch>
*/
async createBatch(options?: CreateBatchOptions): Promise<EventDataBatch> {
this._throwIfSenderOrConnectionClosed();
if (!options) {
options = {};
}
// throw an error if partition key and partition id are both defined
if (
typeof options.partitionKey === "string" &&
typeof this._senderOptions.partitionId === "string"
) {
const error = new Error(
"Creating a batch with partition key is not supported when using producers that were created using a partition id."
);
logger.warning(
"[%s] Creating a batch with partition key is not supported when using producers that were created using a partition id. %O",
this._context.connectionId,
error
);
logErrorStackTrace(error);
throw error;
}
let maxMessageSize = await this._eventHubSender!.getMaxMessageSize({
retryOptions: this._senderOptions.retryOptions,
abortSignal: options.abortSignal
});
if (options.maxSizeInBytes) {
if (options.maxSizeInBytes > maxMessageSize) {
const error = new Error(
`Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.`
);
logger.warning(
`[${this._context.connectionId}] Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}`
);
logErrorStackTrace(error);
throw error;
}
maxMessageSize = options.maxSizeInBytes;
}
return new EventDataBatchImpl(
this._context,
maxMessageSize,
options.partitionKey,
options.partitionId
);
}
/**
* Send one or more of events to the associated Event Hub.
*
* @param eventData An individual `EventData` object, or an array of `EventData` objects or an
* instance of `EventDataBatch`.
* @param options The set of options that can be specified to influence the way in which
* events are sent to the associated Event Hub.
* - `partitionKey` : A value that is hashed to produce a partition assignment.
* Not applicable if the `EventHubProducer` was created using a `partitionId`.
* - `abortSignal` : A signal the request to cancel the send operation.
*
* @returns Promise<void>
* @throws {AbortError} Thrown if the operation is cancelled via the abortSignal.
* @throws {MessagingError} Thrown if an error is encountered while sending a message.
* @throws {TypeError} Thrown if a required parameter is missing.
* @throws {Error} Thrown if the underlying connection or sender has been closed.
* @throws {Error} Thrown if a partitionKey is provided when the producer was created with a partitionId.
* @throws {Error} Thrown if batch was created with partitionKey different than the one provided in the options.
* Create a new producer using the EventHubClient createProducer method.
*/
async send(
eventData: EventData | EventData[] | EventDataBatch,
options: SendOptions = {}
): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.connectionId, "send", "eventData", eventData);
if (Array.isArray(eventData) && eventData.length === 0) {
logger.info(`[${this._context.connectionId}] Empty array was passed. No events to send.`);
return;
}
if (isEventDataBatch(eventData) && eventData.count === 0) {
logger.info(`[${this._context.connectionId}] Empty batch was passsed. No events to send.`);
return;
}
if (!Array.isArray(eventData) && !isEventDataBatch(eventData)) {
eventData = [eventData];
}
// link message span contexts
let spanContextsToLink: SpanContext[] = [];
if (Array.isArray(eventData)) {
for (let i = 0; i < eventData.length; i++) {
const event = eventData[i];
if (!event.properties || !event.properties[TRACEPARENT_PROPERTY]) {
const messageSpan = createMessageSpan(getParentSpan(options));
// since these message spans are created from same context as the send span,
// these message spans don't need to be linked.
// replace the original event with the instrumented one
eventData[i] = instrumentEventData(eventData[i], messageSpan);
messageSpan.end();
}
}
} else if (isEventDataBatch(eventData)) {
spanContextsToLink = eventData._messageSpanContexts;
}
const sendSpan = this._createSendSpan(getParentSpan(options), spanContextsToLink);
try {
const result = await this._eventHubSender!.send(eventData, {
...this._senderOptions,
...options
});
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (err) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: err.message
});
throw err;
} finally {
sendSpan.end();
}
}
/**
* Closes the underlying AMQP sender link.
* Once closed, the producer cannot be used for any further operations.
* Use the `createProducer` function on the EventHubClient to instantiate a new EventHubProducer.
*
* @returns
* @throws {Error} Thrown if the underlying connection encounters an error while closing.
*/
async close(): Promise<void> {
try {
if (this._context.connection && this._context.connection.isOpen() && this._eventHubSender) {
await this._eventHubSender.close();
this._eventHubSender = undefined;
}
this._isClosed = true;
} catch (err) {
logger.warning(
"[%s] An error occurred while closing the Sender for %s: %O",
this._context.connectionId,
this._context.config.entityPath,
err
);
logErrorStackTrace(err);
throw err;
}
}
private _createSendSpan(
parentSpan?: Span | SpanContext,
spanContextsToLink: SpanContext[] = []
): Span {
const links: Link[] = spanContextsToLink.map((spanContext) => {
return {
spanContext
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.EventHubs.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});
span.setAttribute("message_bus.destination", this._eventHubName);
span.setAttribute("peer.address", this._endpoint);
return span;
}
private _throwIfSenderOrConnectionClosed(): void {
throwErrorIfConnectionClosed(this._context);
if (this.isClosed) {
const errorMessage =
`The EventHubProducer for "${this._context.config.entityPath}" has been closed and can no longer be used. ` +
`Please create a new EventHubProducer using the "createProducer" function on the EventHubClient.`;
const error = new Error(errorMessage);
logger.warning(`[${this._context.connectionId}] %O`, error);
logErrorStackTrace(error);
throw error;
}
}
}