Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

switch to zkjs

  • Loading branch information...
commit 5adc4a35af222b86971f5c1edb75eb307d2ad599 1 parent 677c47a
Danny Coates authored
Showing with 24 additions and 26 deletions.
  1. +1 −1  example.js
  2. +2 −2 index.js
  3. +1 −1  package.json
  4. +20 −22 zk.js
2  example.js
View
@@ -4,7 +4,7 @@ var fs = require('fs')
var file = fs.createWriteStream('./test.txt')
var kafka = new Kafka({
- zookeeper: 'localhost:2181',
+ zookeeper: ['localhost:2181'],
brokers: [{
id: 0,
host: 'localhost',
4 index.js
View
@@ -41,12 +41,12 @@ module.exports = function (options) {
if (options.zookeeper) {
try {
- var ZooKeeper = require('zookeeper')
+ var ZooKeeper = require('zkjs')
var ZK = require('./zk')(logger, async, inherits, EventEmitter, ZooKeeper)
var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Broker)
}
catch (e) {
- logger.error('node-zookeeper could not be loaded')
+ logger.error('zkjs could not be loaded')
throw e
}
}
2  package.json
View
@@ -11,7 +11,7 @@
},
"optionalDependencies" : {
"snappy" : "*",
- "zookeeper" : "*"
+ "zkjs" : "*"
},
"repository": {
"type": "git",
42 zk.js
View
@@ -10,14 +10,11 @@ module.exports = function (
// A feable attempt a wrangling the horrible ZooKeeper API
function ZK(options) {
this.zk = new ZooKeeper({
- connect: options.zookeeper,
- timeout: 200000,
- debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARNING,
- host_order_deterministic: false
+ hosts: options.zookeeper
})
this.zk.once(
- 'close',
- function () { logger.info('zk close') }
+ 'expired',
+ function () { logger.info('zk expired') }
)
EventEmitter.call(this)
}
@@ -25,7 +22,7 @@ module.exports = function (
ZK.prototype.connect = function () {
var self = this
- this.zk.connect(
+ this.zk.start(
function (err) {
if (err) {
return self.emit('error', err)
@@ -36,14 +33,14 @@ module.exports = function (
}
ZK.prototype.subscribeToBrokers = function () {
- this.zk.aw_get_children(
+ this.zk.getChildren(
'/brokers/ids',
this.subscribeToBrokers.bind(this),
this._brokersChanged.bind(this)
)
}
- ZK.prototype._brokersChanged = function (rc, err, brokerIds) {
+ ZK.prototype._brokersChanged = function (err, brokerIds) {
if (brokerIds) {
this.emit('brokers', brokerIds)
}
@@ -51,10 +48,9 @@ module.exports = function (
ZK.prototype.getBroker = function (id, done) {
var self = this
- this.zk.a_get(
+ this.zk.get(
'/brokers/ids/' + id,
- false,
- function (rc, err, stat, data) {
+ function (err, data) {
var str = data ? data.toString() : ''
done(id, str)
}
@@ -62,17 +58,17 @@ module.exports = function (
}
ZK.prototype.subscribeToTopics = function () {
- this.zk.aw_get_children(
+ this.zk.getChildren(
'/brokers/topics',
this.subscribeToTopics.bind(this),
this._topicsChanged.bind(this)
)
}
- ZK.prototype._topicsChanged = function (rc, err, topics) {
+ ZK.prototype._topicsChanged = function (err, topics) {
var self = this
async.forEachSeries(
- topics,
+ topics || [],
function (topic, next) {
self._getTopicBrokers(topic, next)
},
@@ -83,18 +79,18 @@ module.exports = function (
}
ZK.prototype._getTopicBrokers = function (name, done) {
- this.zk.aw_get_children(
+ this.zk.getChildren(
'/brokers/topics/' + name,
this._getTopicBrokers.bind(this, name, noop),
this._getBrokersPartitions.bind(this, name, done)
)
}
- ZK.prototype._getBrokersPartitions = function (name, done, rc, err, brokerIds) {
+ ZK.prototype._getBrokersPartitions = function (name, done, err, brokerIds) {
var self = this
if (brokerIds) {
async.forEachSeries(
- brokerIds,
+ brokerIds || [],
function (id, next) {
self._getPartitionCount(name, id, next)
},
@@ -107,7 +103,7 @@ module.exports = function (
ZK.prototype._getPartitionCount = function (name, id, done) {
var self = this
- self.zk.aw_get(
+ self.zk.get(
'/brokers/topics/' + name + '/' + id,
self._getPartitionCount.bind(self, name, id, noop),
function (rc, err, stat, data) {
@@ -119,9 +115,11 @@ module.exports = function (
)
}
+ // TODO indeed
+ /*
ZK.prototype._create = function (path, data, options, cb) {
- this.zk.a_create(path, data, options,
- function (rc, err, stat) {
+ this.zk.create(path, data, options,
+ function (err, stat) {
switch (rc) {
case ZooKeeper.ZOK:
cb(null, stat)
@@ -223,7 +221,7 @@ module.exports = function (
}
)
}
-
+ */
ZK.prototype.getTopicPartitions = function (topics, consumer, cb) {
//TODO
throw new Error("Not Implemented")
Please sign in to comment.
Something went wrong with that request. Please try again.