/
receiveEventsUsingCheckpointStore.ts
97 lines (82 loc) · 3.7 KB
/
receiveEventsUsingCheckpointStore.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @summary Demonstrates how to use the EventHubConsumerClient to process events from all partitions
* of a consumer group in an Event Hubs instance, as well as checkpointing along the way.
*
* Checkpointing using a durable store allows your application to be more resilient. When you restart
* your application after a crash (or an intentional stop), your application can continue consuming
* events from where it last checkpointed.
*/
import { EventHubConsumerClient, CheckpointStore } from "@azure/event-hubs";
import { ContainerClient, StorageSharedKeyCredential } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const connectionString =
process.env["EVENT_HUB_CONNECTION_STRING"] || "<event-hub-connection-string>";
const eventHubName = process.env["EVENT_HUB_NAME"] || "<eventHubName>";
const consumerGroup =
process.env["EVENT_HUB_CONSUMER_GROUP"] || EventHubConsumerClient.defaultConsumerGroupName;
const storageContainerUrl =
process.env["STORAGE_CONTAINER_URL"] ||
"https://<storageaccount>.blob.core.windows.net/<containername>";
const storageAccountName = process.env["STORAGE_ACCOUNT_NAME"] || "<storageaccount>";
const storageAccountKey = process.env["STORAGE_ACCOUNT_KEY"] || "<key>";
export async function main() {
// this client will be used by our eventhubs-checkpointstore-blob, which
// persists any checkpoints from this session in Azure Storage
const storageCredential = new StorageSharedKeyCredential(storageAccountName, storageAccountKey);
const containerClient = new ContainerClient(storageContainerUrl, storageCredential);
if (!(await containerClient.exists())) {
await containerClient.create();
}
const checkpointStore: CheckpointStore = new BlobCheckpointStore(containerClient);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
connectionString,
eventHubName,
checkpointStore
);
// The below code will set up your program to listen to events from your Event Hub instance.
// If your Event Hub instance doesn't have any events, then please run "sendEvents.ts" from the event-hubs project
// located here: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/eventhub/event-hubs/samples/sendEvents.ts
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
if (events.length === 0) {
// If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
// will pass you an empty array.
return;
}
for (const event of events) {
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
);
}
try {
// save a checkpoint for the last event now that we've processed this batch.
await context.updateCheckpoint(events[events.length - 1]);
} catch (err: any) {
console.log(`Error when checkpointing on partition ${context.partitionId}: `, err);
throw err;
}
console.log(
`Successfully checkpointed event with sequence number: ${
events[events.length - 1].sequenceNumber
} from partition: 'partitionContext.partitionId'`
);
},
processError: async (err, context) => {
console.log(`Error on partition "${context.partitionId}": ${err}`);
}
});
// after 30 seconds, stop processing
await new Promise<void>((resolve) => {
setTimeout(async () => {
await subscription.close();
await consumerClient.close();
resolve();
}, 30000);
});
}
main().catch((err) => {
console.log("Error occurred: ", err);
});