Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.
/ node-sinek Public archive
forked from nodefluent/node-sinek

kafka consumer/producer that is polite right out of the box

License

Notifications You must be signed in to change notification settings

holgeradam/node-sinek

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation


node-sinek

license 5cf

kafka client (consumer + producer) polite out of the box

make it about them, not about you

  • Simon Sinek

info

  • promise based api
  • core builds kafka-node module (checkout for options & tweaking)
  • uses ConsumerGroup(s) means your kafka needs to be > 0.9.x ( - 0.10.2+)

offers

  • provides an incoming message flow control for consumers
  • provides a drain once for consumers
  • provides an easy api for producers
  • Documentation is still wip; checkout /test/int/Sinek.test.js

install

npm install --save sinek

test

//requires a localhost kafka broker + zookeeper @ localhost:2181
npm test

Usage

const {Kafka, Drainer, Publisher, PartitionDrainer} = require("sinek");

producer (Publisher)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeProducer([TEST_TOPIC], CLIENT_NAME, OPTIONS);

kafkaClient.on("ready", () => {
    producer = new Publisher(kafkaClient, PARTITION_COUNT); //partition count should be the default count on your brokers partiitons e.g. 30
    
    producer.send(topic, messages, partitionKey, partition, compressionType)
    producer.batch(topic, [])
    
    producer.appendBuffer(topic, identifier, object, compressionType)
    producer.flushBuffer(topic)
    
    //easy api that uses a KeyedPartitioner Type and identifies the
    //target partition for the object's identifier by itself
    //it also brings your payload (object) in perfect shape for 
    //a nicely consumeable topic
    //call producer.flushBuffer(topic) to batch send the payloads
    producer.bufferPublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUnpublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUpdatehMessage(topic, identifier, object, version, compressionType)
});

kafkaClient.on("error", err => console.log("producer error: " + err));

consumer (Drainer)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new Drainer(kafkaClient, 1); //1 = thread/worker/parallel count
    
    consumer.drain((message, done) => {
        console.log(message);
        done();
    });
    
    consumer.stopDrain();
    
    consumer.drainOnce((message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

consumer (PartitionDrainer) [faster ~ runs a queue per topic partition]

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new PartitionDrainer(kafkaClient, 1); //1 = thread/worker/parallel count per partition
    
    //drain requires a topic-name and returns a promise 
    consumer.drain(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }).then(_ => ..).catch(e => console.log(e));
    
    consumer.stopDrain();
    
    //drainOnce requires a topic-name
    consumer.drainOnce(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

hints

  • interesting options for tweaking consumers
const OPTIONS = {
    sessionTimeout: 12500,
    protocol: ["roundrobin"],
    fromOffset: "latest", //earliest
    fetchMaxBytes: 1024 * 100,
    fetchMinBytes: 1,
    fetchMaxWaitMs: 100,
    autoCommit: true,
    autoCommitIntervalMs: 5000
};
  • remove and create topic api will require a special broker configuration or these will just result in nothing at all
drainer.removeTopics([]).then(..)
publisher.createTopics([]).then(..)
  • using the .getStats() functions on Drainer, Publisher or PartitionDrainer you can get some valueable insights into whats currently going on in your client

  • when using "Drainer" to consume and write upserts into a database that require ACID functionality and a build-up of models/message-payloads you must set the AsyncLimit of new Drainer(.., 1) to "1" or you will have trouble with data integrity

  • if your data is spread entity wise above partitions you can use the "PartitionDrainer" to drain multiple partitions at the same time

  • the "Publisher" offers a simple API to create such (keyed) partitioned topics

  • it is probably a good idea to spawn a Consumer per Topic

About

kafka consumer/producer that is polite right out of the box

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • JavaScript 100.0%