-
Notifications
You must be signed in to change notification settings - Fork 51
/
KafkaNode.js
127 lines (126 loc) · 4.97 KB
/
KafkaNode.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const kafka_node_1 = require("kafka-node");
const common_types_1 = require("./common-types");
const rxjs_1 = require("rxjs");
const KafkaMessageStream_1 = require("./KafkaMessageStream");
const KafkaBase_1 = require("./KafkaBase");
const retry = require("retry");
class Kafka extends KafkaBase_1.default {
async getKafkaClient(config) {
return new Promise((resolve, reject) => {
const operation = retry.operation({});
operation.attempt(currentAttempt => {
const client = new kafka_node_1.KafkaClient({ kafkaHost: config.kafkaSeedUrls[0] });
client.connect();
client.on('ready', () => {
resolve(client);
});
client.on('error', err => {
if (!operation.retry(err)) {
reject(operation.mainError());
}
});
});
});
}
async getProducer(config) {
const client = await this.getKafkaClient(config);
const producer = new kafka_node_1.Producer(client);
return producer;
}
async getConsumer(topicId, config) {
const client = await this.getKafkaClient(config);
const consumer = new kafka_node_1.Consumer(client, [
{ topic: topicId },
], {
groupId: topicId,
autoCommit: true,
});
return consumer;
}
async createTopic(topicId, config) {
const client = await this.getKafkaClient(config);
await new Promise((resolve, reject) => {
client.createTopics([{ topic: topicId, partitions: 1, replicationFactor: 1 }], (err, data) => {
if (err) {
// tslint:disable-next-line:no-console
console.log(`Error creating topic ${topicId}`);
reject(err);
}
else {
// tslint:disable-next-line:no-console
console.log(`Topic created ${topicId}`);
resolve();
}
});
});
}
sendMessage(topicId, message, config) {
return this.sendPayloads([{ topic: topicId, messages: message }], config);
}
async sendPayloads(payloads, config) {
const producer = await this.getProducer(config);
// tslint:disable-next-line:no-console
console.log(`Sending ${JSON.stringify(payloads)}`);
const sendPromise = new Promise((resolve, reject) => {
producer.send(payloads, (err, data) => {
if (err) {
// tslint:disable-next-line:no-console
console.log(`Error sending ${JSON.stringify(payloads)}`);
reject(err);
}
else {
// tslint:disable-next-line:no-console
console.log(`Sent ${JSON.stringify(payloads)}`);
resolve();
}
});
});
return sendPromise;
}
sendParams(topicId, basicParams, config) {
return this.sendMessage(topicId, JSON.stringify(basicParams.serialize()), config);
}
async rawMessages(topicId, config) {
const consumer = await this.getConsumer(topicId, config);
const kafkaStream = new rxjs_1.Subject();
// tslint:disable-next-line:no-console
console.log(`Listening on ${topicId}`);
consumer.on('message', message => {
try {
// tslint:disable-next-line:no-console
console.log(`Message on ${topicId}: ${JSON.stringify(message)}`);
const messageString = message.value.toString();
kafkaStream.next(message);
}
catch (error) {
kafkaStream.error(`error while trying to parse message. topic: ${topicId} error: ${JSON.stringify(error)}, message: ${JSON.stringify(message)}`);
}
});
consumer.on('error', err => {
// tslint:disable-next-line:no-console
console.log(`Consumer error on ${topicId}: ${JSON.stringify(err)}`);
kafkaStream.error(`Consumer error. topic: ${topicId} error: ${JSON.stringify(err)}`);
});
return kafkaStream;
}
async messages(topicId, config) {
const stream = (await this.rawMessages(topicId, config)).map(message => {
const messageString = message.value.toString();
const messageObject = JSON.parse(messageString);
return {
type: messageObject.type,
protocol: messageObject.protocol,
contents: messageString,
};
});
return new KafkaMessageStream_1.default(common_types_1.Observable.fromObservable(stream, topicId));
}
async isConnected(config) {
await this.getKafkaClient(config);
return true;
}
}
exports.default = Kafka;
//# sourceMappingURL=KafkaNode.js.map