Skip to content

Commit

Permalink
Leftover zk (#1167)
Browse files Browse the repository at this point in the history
* no need to expose 2181

* remove references to zookeeper and removed Client

* fetchCommitsV1 is the new fetchCommits

* remove zk option, add commit doc to CG
  • Loading branch information
hyperlink committed Jan 10, 2019
1 parent 0fbcb76 commit 9521495
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 121 deletions.
109 changes: 28 additions & 81 deletions README.md
Expand Up @@ -8,7 +8,7 @@ Kafka-node
<!--[![NPM](https://nodei.co/npm-dl/kafka-node.png?height=3)](https://nodei.co/npm/kafka-node/)-->


Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.9 and later.
Kafka-node is a Node.js client for Apache Kafka 0.9 and later.

# Table of Contents
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
Expand Down Expand Up @@ -85,7 +85,7 @@ const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'});
```

## Producer
### Producer(client, [options], [customPartitioner])
### Producer(KafkaClient, [options], [customPartitioner])
* `client`: client which keeps a connection with the Kafka server.
* `options`: options for producer,

Expand Down Expand Up @@ -122,7 +122,7 @@ var kafka = require('kafka-node'),
key: 'theKey', // string or buffer, only needed when using keyed partitioner
partition: 0, // default 0
attributes: 2, // default: 0
timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only)
timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}
```

Expand Down Expand Up @@ -191,7 +191,7 @@ var topicsToCreate = [{
],
// Optional explicit partition / replica assignment
// When this property exists, partitions and replicationFactor properties are ignored
replicaAssignment: [
replicaAssignment: [
{
partition: 0,
replicas: [3, 4]
Expand All @@ -204,13 +204,13 @@ var topicsToCreate = [{
}];

client.createTopics(topicsToCreate, (error, result) => {
// result is an array of any errors if a given topic could not be created
// result is an array of any errors if a given topic could not be created
});

```

## HighLevelProducer
### HighLevelProducer(client, [options], [customPartitioner])
### HighLevelProducer(KafkaClient, [options], [customPartitioner])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: options for producer,

Expand Down Expand Up @@ -281,7 +281,7 @@ Example:
``` js
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client(),
client = new kafka.KafkaClient(),
producer = new HighLevelProducer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
Expand Down Expand Up @@ -543,25 +543,12 @@ Similar API as `Consumer` with some exceptions. Methods like `pause` and `resume

## ConsumerGroup

The new consumer group uses Kafka broker coordinators instead of Zookeeper to manage consumer groups. This is supported in **Kafka version 0.9** and above only.

### Coming from the highLevelConsumer

API is very similar to `HighLevelConsumer` since it extends directly from HLC so many of the same options will apply with some exceptions noted below:

* In an effort to make the API simpler you no longer need to create a `client` this is done inside the `ConsumerGroup`
* consumer ID do not need to be defined. There's a new ID to represent consumers called *member ID* and this is assigned to consumer after joining the group
* Offsets, group members, and ownership details are not stored in Zookeeper
* `ConsumerGroup` does not emit a `registered` event

### ConsumerGroup(options, topics)

```js
var options = {
host: 'zookeeper:2181', // zookeeper host omit if connecting directly to broker (see kafkaHost below)
kafkaHost: 'broker:9092', // connect directly to kafka broker (instantiates a KafkaClient)
zk : undefined, // put client zk settings if you need them (see Client)
batch: undefined, // put client batch settings if you need them (see Client)
batch: undefined, // put client batch settings if you need them
ssl: true, // optional (defaults to false) or tls options hash
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
Expand Down Expand Up @@ -709,6 +696,19 @@ consumer.on('message', function (message) {

### on('offsetOutOfRange', function (err) {})

### commit(force, cb)
Commit offset of the current topics manually, this method should be called when a consumer leaves

* `force`: **Boolean**, force a commit even if there's a pending commit, default false (optional)
* `cb`: **Function**, the callback

Example:

``` js
consumer.commit(function(err, data) {
});
```

### pause()
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).

Expand Down Expand Up @@ -755,7 +755,7 @@ Closes the `ConsumerGroup`. Calls `callback` when complete. If `autoCommit` is e
* `client`: client which keeps a connection with the Kafka server.

### events
* `ready`: when zookeeper is ready
* `ready`: when all brokers are discovered
* `connect` when broker is ready

### fetch(payloads, cb)
Expand All @@ -781,7 +781,7 @@ Example

```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
client = new kafka.KafkaClient(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
Expand All @@ -791,38 +791,8 @@ var kafka = require('kafka-node'),
});
```

### commit(groupId, payloads, cb)

> ⚠️**WARNING**: commits are made to zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup`
* `groupId`: consumer group
* `payloads`: **Array**,array of `OffsetCommitRequest`, `OffsetCommitRequest` is a JSON object like:

``` js
{
topic: 'topicName',
partition: 0, //default 0
offset: 1,
metadata: 'm', //default 'm'
}
```

Example

```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.commit('groupId', [
{ topic: 't', partition: 0, offset: 10 }
], function (err, data) {
});
```

### fetchCommits(groupid, payloads, cb)

> ⚠️**WARNING**: commits are from zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup`
Fetch the last committed offset in a topic of a specific consumer group

* `groupId`: consumer group
Expand All @@ -839,41 +809,18 @@ Example

```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
client = new kafka.KafkaClient(),
offset = new kafka.Offset(client);
offset.fetchCommits('groupId', [
offset.fetchCommitsV1('groupId', [
{ topic: 't', partition: 0 }
], function (err, data) {
});
```

### fetchCommitsV1(groupid, payloads, cb)

> ⚠️**WARNING**: commits are from the broker and is only compatible with the new `ConsumerGroup` and will **NOT** with the old `HighLevelConsumer`
Fetch the last committed offset in a topic of a specific consumer group

* `groupId`: consumer group
* `payloads`: **Array**,array of `OffsetFetchRequest`, `OffsetFetchRequest` is a JSON object like:

``` js
{
topic: 'topicName',
partition: 0 //default 0
}
```

Example
Alias of `fetchCommits`.

```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetchCommitsV1('groupId', [
{ topic: 't', partition: 0 }
], function (err, data) {
});
```
### fetchLatestOffsets(topics, cb)

Example
Expand Down Expand Up @@ -906,8 +853,8 @@ Example

This class provides administrative APIs can be used to monitor and administer the Kafka cluster.

### Admin(kafkaClient)
* `kafkaClient`: client which keeps a connection with the Kafka server. (**`KafkaClient` only**, `client` not supported)
### Admin (KafkaClient)
* `kafkaClient`: client which keeps a connection with the Kafka server.

### listGroups(cb)

Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yml
Expand Up @@ -2,8 +2,6 @@ version: '2'
services:
zookeeper:
image: jplock/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
ports:
Expand Down
71 changes: 33 additions & 38 deletions lib/offset.js
Expand Up @@ -56,15 +56,7 @@ Offset.prototype.commit = function (groupId, payloads, cb) {
this.client.sendOffsetCommitRequest(groupId, this.buildPayloads(payloads), cb);
};

Offset.prototype.fetchCommits = function (groupId, payloads, cb) {
if (!this.ready) {
this.once('ready', () => this.fetchCommits(groupId, payloads, cb));
return;
}
this.client.sendOffsetFetchRequest(groupId, this.buildPayloads(payloads), cb);
};

Offset.prototype.fetchCommitsV1 = function (groupId, payloads, cb) {
Offset.prototype.fetchCommits = Offset.prototype.fetchCommitsV1 = function (groupId, payloads, cb) {
if (!this.ready) {
this.once('ready', () => this.fetchCommitsV1(groupId, payloads, cb));
return;
Expand All @@ -90,41 +82,44 @@ function fetchOffsets (offset, topics, cb, when) {
}
return;
}
async.waterfall([
callback => {
offset.client.loadMetadataForTopics(topics, callback);
},
(topicsMetaData, callback) => {
var payloads = [];
var metaDatas = topicsMetaData[1].metadata;
Object.keys(metaDatas).forEach(function (topicName) {
var topic = metaDatas[topicName];
Object.keys(topic).forEach(function (partition) {
payloads.push({
topic: topicName,
partition: partition,
time: when
async.waterfall(
[
callback => {
offset.client.loadMetadataForTopics(topics, callback);
},
(topicsMetaData, callback) => {
var payloads = [];
var metaDatas = topicsMetaData[1].metadata;
Object.keys(metaDatas).forEach(function (topicName) {
var topic = metaDatas[topicName];
Object.keys(topic).forEach(function (partition) {
payloads.push({
topic: topicName,
partition: partition,
time: when
});
});
});
});

if (payloads.length === 0) {
return callback(new Error('Topic(s) does not exist'));
}
if (payloads.length === 0) {
return callback(new Error('Topic(s) does not exist'));
}

offset.fetch(payloads, callback);
},
function (results, callback) {
Object.keys(results).forEach(function (topicName) {
var topic = results[topicName];
offset.fetch(payloads, callback);
},
function (results, callback) {
Object.keys(results).forEach(function (topicName) {
var topic = results[topicName];

Object.keys(topic).forEach(function (partitionName) {
topic[partitionName] = topic[partitionName][0];
Object.keys(topic).forEach(function (partitionName) {
topic[partitionName] = topic[partitionName][0];
});
});
});
callback(null, results);
}
], cb);
callback(null, results);
}
],
cb
);
}

module.exports = Offset;

0 comments on commit 9521495

Please sign in to comment.