-
Notifications
You must be signed in to change notification settings - Fork 577
/
EventProcessorSample.java
197 lines (176 loc) · 10.8 KB
/
EventProcessorSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.samples.Basic;
/*
* Until the official release, there is no package distributed for EventProcessorHost, and hence no good
* portable way of putting a reference to it in the samples POM. Thus, the contents of this sample are
* commented out by default to avoid blocking or breaking anything. To use this sample, add a dependency
* on EventProcessorHost, then uncomment.
*/
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
public class EventProcessorSample
{
public static void main(String args[])
{
// SETUP SETUP SETUP SETUP
// Fill these strings in with the information of the Event Hub you wish to use. The consumer group
// can probably be left as-is. You will also need the connection string for an Azure Storage account,
// which is used to persist the lease and checkpoint data for this Event Hub.
String consumerGroupName = "$Default";
String namespaceName = "----ServiceBusNamespaceName----";
String eventHubName = "----EventHubName----";
String sasKeyName = "----SharedAccessSignatureKeyName----";
String sasKey = "----SharedAccessSignatureKey----";
String storageConnectionString = "----AzureStorageConnectionString----";
// To conveniently construct the Event Hub connection string from the raw information, use the ConnectionStringBuilder class.
ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);
// Create the instance of EventProcessorHost using the most basic constructor. This constructor uses Azure Storage for
// persisting partition leases and checkpoints, with a default Storage container name made from the Event Hub name
// and consumer group name. The host name (a string that uniquely identifies the instance of EventProcessorHost)
// is automatically generated as well.
EventProcessorHost host = new EventProcessorHost(
"----EventProcessorHostName----",
eventHubName,
consumerGroupName,
eventHubConnectionString.toString(),
storageConnectionString,
"----StorageContainerName----");
// Registering an event processor class with an instance of EventProcessorHost starts event processing. The host instance
// obtains leases on some partitions of the Event Hub, possibly stealing some from other host instances, in a way that
// converges on an even distribution of partitions across all host instances. For each leased partition, the host instance
// creates an instance of the provided event processor class, then receives events from that partition and passes them to
// the event processor instance.
//
// There are two error notification systems in EventProcessorHost. Notification of errors tied to a particular partition,
// such as a receiver failing, are delivered to the event processor instance for that partition via the onError method.
// Notification of errors not tied to a particular partition, such as initialization failures, are delivered to a general
// notification handler that is specified via an EventProcessorOptions object. You are not required to provide such a
// notification handler, but if you don't, then you may not know that certain errors have occurred.
System.out.println("Registering host named " + host.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
try
{
// The Future returned by the register* APIs completes when initialization is done and
// message pumping is about to start. It is important to call get() on the Future because
// initialization failures will result in an ExecutionException, with the failure as the
// inner exception, and are not otherwise reported.
host.registerEventProcessor(EventProcessor.class, options).get();
}
catch (Exception e)
{
System.out.print("Failure while registering: ");
if (e instanceof ExecutionException)
{
Throwable inner = e.getCause();
System.out.println(inner.toString());
}
else
{
System.out.println(e.toString());
}
}
System.out.println("Press enter to stop");
try
{
System.in.read();
// Processing of events continues until unregisterEventProcessor is called. Unregistering shuts down the
// receivers on all currently owned leases, shuts down the instances of the event processor class, and
// releases the leases for other instances of EventProcessorHost to claim.
System.out.println("Calling unregister");
host.unregisterEventProcessor();
// There are two options for shutting down EventProcessorHost's internal thread pool: automatic and manual.
// Both have their advantages and drawbacks. See the JavaDocs for setAutoExecutorShutdown and forceExecutorShutdown
// for more details. This example uses forceExecutorShutdown because it is the safe option, at the expense of
// another line of code.
System.out.println("Calling forceExecutorShutdown");
EventProcessorHost.forceExecutorShutdown(120);
}
catch(Exception e)
{
System.out.println(e.toString());
e.printStackTrace();
}
System.out.println("End of sample");
}
// The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
// as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
// was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
{
@Override
public void accept(ExceptionReceivedEventArgs t)
{
System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
public static class EventProcessor implements IEventProcessor
{
private int checkpointBatchingCount = 0;
// OnOpen is called when a new event processor instance is created by the host. In a real implementation, this
// is the place to do initialization so that events can be processed when they arrive, such as opening a database
// connection.
@Override
public void onOpen(PartitionContext context) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
}
// OnClose is called when an event processor instance is being shut down. The reason argument indicates whether the shut down
// is because another host has stolen the lease for this partition or due to error or host shutdown. In a real implementation,
// this is the place to do cleanup for resources that were opened in onOpen.
@Override
public void onClose(PartitionContext context, CloseReason reason) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
}
// onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
// It is NOT called for exceptions thrown out of onOpen/onClose/onEvents. EventProcessorHost is responsible for recovering from
// the error, if possible, or shutting the event processor down if not, in which case there will be a call to onClose. The
// notification provided to onError is primarily informational.
@Override
public void onError(PartitionContext context, Throwable error)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
}
// onEvents is called when events are received on this partition of the Event Hub. The maximum number of events in a batch
// can be controlled via EventProcessorOptions. Also, if the "invoke processor after receive timeout" option is set to true,
// this method will be called with null when a receive timeout occurs.
@Override
public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got message batch");
int messageCount = 0;
for (EventData data : messages)
{
System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
messageCount++;
// Checkpointing persists the current position in the event stream for this partition and means that the next
// time any host opens an event processor on this event hub+consumer group+partition combination, it will start
// receiving at the event after this one. Checkpointing is usually not a fast operation, so there is a tradeoff
// between checkpointing frequently (to minimize the number of events that will be reprocessed after a crash, or
// if the partition lease is stolen) and checkpointing infrequently (to reduce the impact on event processing
// performance). Checkpointing every five events is an arbitrary choice for this sample.
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 5) == 0)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
context.checkpoint(data);
}
}
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + messageCount + " for host " + context.getOwner());
}
}
}