-
Notifications
You must be signed in to change notification settings - Fork 3
/
confluent-kafka-producer.ts
90 lines (80 loc) · 2.38 KB
/
confluent-kafka-producer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 專案名稱: @wisrtoni40/confluent-schema
* 部門代號: ML8100
* 檔案說明: Confluent Kafka Producer
* @CREATE Wed Jan 13 2021 下午1:47:20
* @author Steve Y Lin
* @contact Steve_Y_Lin@wistron.com #1342
* -----------------------------------------------------------------------------
* @NOTE
*/
import { HighLevelProducer, KafkaClient } from 'kafka-node';
import { v4 as uuidv4 } from 'uuid';
import {
ConfluentAvroStrategy,
ConfluentMultiRegistry,
ConfluentPubResolveStrategy,
} from 'wisrtoni40-confluent-schema';
/**
* -----------------------------------------------------------------------------
* Config
* -----------------------------------------------------------------------------
*/
const kafkaHost = 'localhost:9193,localhost:9193,localhost:9193';
const topic = 'input.your.topic';
const registryHost =
'http://localhost:8585,http://localhost:8585,http://localhost:8585';
/**
* -----------------------------------------------------------------------------
* Kafka Client and Producer
* -----------------------------------------------------------------------------
*/
const kafkaClient = new KafkaClient({
kafkaHost,
clientId: uuidv4(),
connectTimeout: 60000,
requestTimeout: 60000,
connectRetryOptions: {
retries: 5,
factor: 0,
minTimeout: 1000,
maxTimeout: 1000,
randomize: false,
},
sasl: {
mechanism: 'plain',
username: 'username',
password: 'password',
},
});
const producer = new HighLevelProducer(kafkaClient, {
requireAcks: 1,
ackTimeoutMs: 100,
});
/**
* -----------------------------------------------------------------------------
* Confluent Resolver
* -----------------------------------------------------------------------------
*/
const schemaRegistry = new ConfluentMultiRegistry(registryHost);
const avro = new ConfluentAvroStrategy();
const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic);
/**
* -----------------------------------------------------------------------------
* Produce
* -----------------------------------------------------------------------------
*/
(async () => {
const data = {
params1: 100,
params2: 'param2',
};
const processedData = await resolver.resolve(data);
producer.send([{ topic, messages: processedData }], (error, result) => {
if (error) {
console.error(error);
} else {
console.log(result);
}
});
})();