-
Notifications
You must be signed in to change notification settings - Fork 5
/
kafka-message.js
35 lines (32 loc) · 1.04 KB
/
kafka-message.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Observable operator: extracts a string value from a kafka message
*
* @example
* const opts = { brokers: 'kafka://127.0.0.1:9092' };
* const KafkaObservable = require('kafka-observable')(opts);
* KafkaObservable.fromTopic('my_topic')
* .let(KafkaObservable.TextMessage())
* .subscribe(message => console.info(message));
*
* @module operators/TextMessage
* @author ghermeto
**/
'use strict';
const Observable = require('rxjs').Observable;
const kafkaMessage = (mapper = x => x) =>
(source) =>
Observable.create(observer =>
source
.map(({message}) => message.value.toString('utf8'))
.subscribe(
message => {
try { observer.next(mapper(message)); }
catch(err) { observer.error(err); }
},
err => observer.error(err),
() => observer.complete()));
/**
* @function TextMessage
* @param {Function} mapper mapping function
*/
module.exports = kafkaMessage;