-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
71 lines (64 loc) · 1.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const { Kafka } = require("kafkajs");
const { delay } = require("bluebird");
const topic = 'EVENTS';
const numPartitions = 4;
const groupId = 'CONSUMER_GROUP_ID';
let client = new Kafka({
brokers: ['kafka:9092'],
});
async function createTopic() {
const admin = client.admin();
await admin.connect();
await admin.createTopics({
topics: [{
topic,
numPartitions,
}],
});
await admin.disconnect();
}
async function produce() {
const producer = client.producer();
await producer.connect();
for (let id = 0; id < numPartitions*10; id++) {
const message = { id };
const messageStr = JSON.stringify(message);
const partition = id % numPartitions;
await producer.send({
topic,
messages: [{
value: messageStr,
partition,
}],
});
}
await producer.disconnect();
setTimeout(produce, 1000);
}
async function consume() {
const consumer = client.consumer({
groupId,
maxBytesPerPartition: 1024,
readUncommitted: false,
});
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
autoCommitInterval: 500,
partitionsConsumedConcurrently: 2,
eachMessage: async ({ topic, partition, message }) => {
const msg = JSON.parse(message.value.toString());
if (partition === 0) {
// 5 seconds delay on partition 0
await delay(5 * 1000);
}
console.log(`Message processed from [${topic}:${partition}] with id [${msg.id}]`);
},
});
}
async function main() {
await createTopic();
await consume();
produce();
}
main();