/
EventHubProducerClient.java
337 lines (317 loc) · 14.8 KB
/
EventHubProducerClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.IterableStream;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;
/**
* A <b>synchronous</b> producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped
* together in batches. Depending on the {@link CreateBatchOptions options} specified when creating an
* {@link EventDataBatch}, the events may be automatically routed to an available partition or specific to a partition.
*
* <p>
* Allowing automatic routing of partitions is recommended when:
* <ul>
* <li>The sending of events needs to be highly available.</li>
* <li>The event data should be evenly distributed among all available partitions.</li>
* </ul>
*
* <p>
* If no partition id is specified, the following rules are used for automatically selecting one:
*
* <ol>
* <li>Distribute the events equally amongst all available partitions using a round-robin approach.</li>
* <li>If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the
* message to another available partition.</li>
* </ol>
*
* <p><strong>Create a producer and publish events to any partition</strong></p>
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.createBatch -->
* <pre>
* // The required parameter is a way to authenticate with Event Hubs using credentials.
* // The connectionString provides a way to authenticate with Event Hub.
* EventHubProducerClient producer = new EventHubClientBuilder()
* .connectionString(
* "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}",
* "event-hub-name")
* .buildProducerClient();
* List<EventData> events = Arrays.asList(new EventData("test-event-1"), new EventData("test-event-2"));
*
* // Creating a batch without options set, will allow for automatic routing of events to any partition.
* EventDataBatch batch = producer.createBatch();
* for (EventData event : events) {
* if (batch.tryAdd(event)) {
* continue;
* }
*
* producer.send(batch);
* batch = producer.createBatch();
* if (!batch.tryAdd(event)) {
* throw new IllegalArgumentException("Event is too large for an empty batch.");
* }
* }
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.createBatch -->
*
* <p><strong>Publish events to partition "foo"</strong></p>
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-partitionId -->
* <pre>
* // Creating a batch with partitionId set will route all events in that batch to partition `foo`.
* CreateBatchOptions options = new CreateBatchOptions().setPartitionId("foo");
*
* EventDataBatch batch = producer.createBatch(options);
* batch.tryAdd(new EventData("data-to-partition-foo"));
* producer.send(batch);
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-partitionId -->
*
* <p><strong>Publish events to the same partition, grouped together using partition key</strong></p>
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-partitionKey -->
* <pre>
* List<EventData> events = Arrays.asList(new EventData("sourdough"), new EventData("rye"),
* new EventData("wheat"));
*
* // Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which
* // partition to send the events to. Events with the same partitionKey are always routed to the same partition.
* CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
* EventDataBatch batch = producer.createBatch(options);
*
* events.forEach(event -> batch.tryAdd(event));
* producer.send(batch);
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-partitionKey -->
*
* <p><strong>Publish events using a size-limited {@link EventDataBatch}</strong></p>
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-int -->
* <pre>
* List<EventData> telemetryEvents = Arrays.asList(firstEvent, secondEvent, thirdEvent);
*
* // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
* // In this case, all the batches created with these options are limited to 256 bytes.
* CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
*
* EventDataBatch currentBatch = producer.createBatch(options);
*
* // For each telemetry event, we try to add it to the current batch.
* // When the batch is full, send it then create another batch to add more events to.
* for (EventData event : telemetryEvents) {
* if (!currentBatch.tryAdd(event)) {
* producer.send(currentBatch);
* currentBatch = producer.createBatch(options);
*
* // Add the event we couldn't before.
* if (!currentBatch.tryAdd(event)) {
* throw new IllegalArgumentException("Event is too large for an empty batch.");
* }
* }
* }
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-int -->
*
* @see EventHubClientBuilder#buildProducerClient()
* @see EventHubProducerAsyncClient To asynchronously generate events to an Event Hub, see EventHubProducerAsyncClient.
*/
@ServiceClient(builder = EventHubClientBuilder.class)
public class EventHubProducerClient implements Closeable {
private final EventHubProducerAsyncClient producer;
private final Duration tryTimeout;
/**
* Creates a new instance of {@link EventHubProducerClient} that sends messages to an Azure Event Hub.
*
* @throws NullPointerException if {@code producer} or {@code tryTimeout} is null.
*/
EventHubProducerClient(EventHubProducerAsyncClient producer, Duration tryTimeout) {
this.producer = Objects.requireNonNull(producer, "'producer' cannot be null.");
this.tryTimeout = Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null.");
}
/**
* Gets the Event Hub name this client interacts with.
*
* @return The Event Hub name this client interacts with.
*/
public String getEventHubName() {
return producer.getEventHubName();
}
/**
* Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to
* {@code {yournamespace}.servicebus.windows.net}.
*
* @return The fully qualified Event Hubs namespace that the connection is associated with.
*/
public String getFullyQualifiedNamespace() {
return producer.getFullyQualifiedNamespace();
}
/**
* Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
*
* @return The set of information for the Event Hub that this client is associated with.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventHubProperties getEventHubProperties() {
return producer.getEventHubProperties().block(tryTimeout);
}
/**
* Retrieves the identifiers for the partitions of an Event Hub.
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<String> getPartitionIds() {
return new IterableStream<>(producer.getPartitionIds());
}
/**
* Retrieves information about a specific partition for an Event Hub, including elements that describe the available
* events in the partition event stream.
*
* @param partitionId The unique identifier of a partition associated with the Event Hub.
* @return The set of information for the requested partition under the Event Hub this client is associated with.
* @throws NullPointerException if {@code partitionId} is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public PartitionProperties getPartitionProperties(String partitionId) {
return producer.getPartitionProperties(partitionId).block(tryTimeout);
}
/**
* Creates an {@link EventDataBatch} that can fit as many events as the transport allows.
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch() {
return producer.createBatch().block(tryTimeout);
}
/**
* Creates an {@link EventDataBatch} configured with the options specified.
*
* @param options A set of options used to configure the {@link EventDataBatch}.
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*
* @throws NullPointerException if {@code options} is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch(CreateBatchOptions options) {
return producer.createBatch(options).block(tryTimeout);
}
/**
* Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
* allowed, an exception will be triggered and the send will fail.
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param event Event to send to the service.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event) {
producer.send(event).block();
}
/**
* Sends a single event to the associated Event Hub with the send options. If the size of the single event exceeds
* the maximum size allowed, an exception will be triggered and the send will fail.
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event, SendOptions options) {
producer.send(event, options).block();
}
/**
* Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable -->
* <pre>
* List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
* new EventData("oak"));
* producer.send(events);
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable -->
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param events Events to send to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable<EventData> events) {
producer.send(events).block();
}
/**
* Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* <!-- src_embed com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions -->
* <pre>
* List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
* new EventData("New York"));
* SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
* producer.send(events, sendOptions);
* </pre>
* <!-- end com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions -->
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable<EventData> events, SendOptions options) {
producer.send(events, options).block();
}
/**
* Sends the batch to the associated Event Hub.
*
* @param batch The batch to send to the service.
* @throws NullPointerException if {@code batch} is {@code null}.
* @see EventHubProducerClient#createBatch()
* @see EventHubProducerClient#createBatch(CreateBatchOptions)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(EventDataBatch batch) {
producer.send(batch).block();
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
producer.close();
}
/**
* Gets the client identifier.
*
* @return The unique identifier string for current client.
*/
public String getIdentifier() {
return producer.getIdentifier();
}
}