/* * node-rdkafka - Node.js wrapper for RdKafka C/C++ library * * Copyright (c) 2016 Blizzard Entertainment * * This software may be modified and distributed under the terms * of the MIT license. See the LICENSE.txt file for details. */ var Kafka = require('../'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; var producer; var config = { 'client.id': 'kafka-test', 'metadata.broker.list': kafkaBrokerList, 'dr_cb': true, } if (process.env.DEBUG) { config['debug'] = 'all'; } console.log(config); producer = new Kafka.Producer(config); producer.on('event.log', function(event) { console.log(event); }) .on('ready', function(info) { console.log('%s connected to kafka server', info.name); var tt = setInterval(function() { producer.poll(); }, 200); producer.once('delivery-report', function(report) { console.log('Received a message'); clearInterval(tt); producer.on('disconnected', function(info) { console.log('Disconnected from kafka server'); producer = new Kafka.Producer(config); producer.on('event.log', function(event) { console.log(event); }) .on('disconnected', function(info) { console.log('Disconnected from kafka server'); }) .on('ready', function() { console.log('2nd client connected'); var total = 0; var max = 10000; var verified_received = 0; var tt = setInterval(function() { producer.poll(); }, 200); producer.on('delivery-report', function(report) { verified_received++; if (verified_received === max) { clearInterval(tt); producer.disconnect(function() { console.log('Disconnecting 2nd producer'); }); } }); // Produce for (total = 0; total <= max; total++) { console.log('Sending message ' + total + ' to topic string'); producer.produce('test', null, new Buffer('message ' + total), null); } }); producer.connect(); }); console.log('Disconnecting 1st producer'); producer.disconnect(); }); console.log('Sending message to topic object'); producer.produce(producer.Topic('test', {}), null, new Buffer('value'), 'key'); //producer.produce('test', null, new Buffer('value'), 'key'); }); producer.connect();