-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathstream-consumer-group.js
105 lines (93 loc) · 2.82 KB
/
stream-consumer-group.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// A sample stream consumer using the blocking variant of XREADGROUP.
// https://redis.io/commands/xreadgroup/
// This consumer works in collaboration with other instances of itself
// in the same consumer group such that the group as a whole receives
// every entry from the stream.
//
// This consumes entries from a stream created by stream-producer.js
//
// Run this as follows:
//
// $ node stream-consumer-group.js <consumerName>
//
// Run multiple instances with different values of <consumerName>
// to see them processing the stream as a group:
//
// $ node stream-consumer-group.js consumer1
//
// In another terminal:
//
// $ node stream-consumer-group.js consumer2
import { createClient, commandOptions } from 'redis';
const client = createClient();
if (process.argv.length !== 3) {
console.log(`usage: node stream-consumer-group.js <consumerName>`);
process.exit(1);
}
const consumerName = process.argv[2];
await client.connect();
// Create the consumer group (and stream) if needed...
try {
// https://redis.io/commands/xgroup-create/
await client.xGroupCreate('mystream', 'myconsumergroup', '0', {
MKSTREAM: true
});
console.log('Created consumer group.');
} catch (e) {
console.log('Consumer group already exists, skipped creation.');
}
console.log(`Starting consumer ${consumerName}.`);
while (true) {
try {
// https://redis.io/commands/xreadgroup/
let response = await client.xReadGroup(
commandOptions({
isolated: true
}),
'myconsumergroup',
consumerName, [
// XREADGROUP can read from multiple streams, starting at a
// different ID for each...
{
key: 'mystream',
id: '>' // Next entry ID that no consumer in this group has read
}
], {
// Read 1 entry at a time, block for 5 seconds if there are none.
COUNT: 1,
BLOCK: 5000
}
);
if (response) {
// Response is an array of streams, each containing an array of
// entries:
//
// [
// {
// "name": "mystream",
// "messages": [
// {
// "id": "1642088708425-0",
// "message": {
// "num": "999"
// }
// }
// ]
// }
// ]
console.log(JSON.stringify(response));
// Use XACK to acknowledge successful processing of this
// stream entry.
// https://redis.io/commands/xack/
const entryId = response[0].messages[0].id;
await client.xAck('mystream', 'myconsumergroup', entryId);
console.log(`Acknowledged processing of entry ${entryId}.`);
} else {
// Response is null, we have read everything that is
// in the stream right now...
console.log('No new stream entries.');
}
} catch (err) {
console.error(err);
}
}