Skip to content

Commit

Permalink
Allow for a global config to be passed to the createWriteStream/creat…
Browse files Browse the repository at this point in the history
…eReadStream methods

This resolves issue #8.
  • Loading branch information
Derek Miller committed Mar 13, 2019
1 parent 3aea941 commit 316fdf1
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions src/kafka-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface IKafkaOptions {
port: string
logger?: Logger,
groupId?: any,
globalConfig?: object,
}

export interface IKafkaProducer {
Expand Down Expand Up @@ -86,9 +87,11 @@ export class KafkaPubSub implements PubSubEngine {
}

private createProducer(topic: string) {
const producer = Kafka.Producer.createWriteStream({
'metadata.broker.list': this.brokerList()
}, {}, { topic })
const producer = Kafka.Producer.createWriteStream(
Object.assign({}, {'metadata.broker.list': this.brokerList()}, this.options.globalConfig),
{},
{topic}
);
producer.on('error', (err) => {
this.logger.error(err, 'Error in our kafka stream')
})
Expand All @@ -98,12 +101,18 @@ export class KafkaPubSub implements PubSubEngine {
private createConsumer(topic: string) {
// Create a group for each instance. The consumer will receive all messages from the topic
const groupId = this.options.groupId || Math.ceil(Math.random() * 9999)
const consumer = Kafka.KafkaConsumer.createReadStream({
'group.id': `kafka-group-${groupId}`,
'metadata.broker.list': this.brokerList(),
}, {}, {
topics: [topic]
})
const consumer = Kafka.KafkaConsumer.createReadStream(
Object.assign(
{},
{
'group.id': `kafka-group-${groupId}`,
'metadata.broker.list': this.brokerList(),
},
this.options.globalConfig,
),
{},
{ topics: [topic] }
);
consumer.on('data', (message) => {
let parsedMessage = JSON.parse(message.value.toString())

Expand Down

0 comments on commit 316fdf1

Please sign in to comment.