This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

producing with zk is working the new way

  • Loading branch information...
dannycoates committed Nov 17, 2012
1 parent 956284f commit f4df8d9489db03b7a24737a3dc43ff05aa460f24
Showing with 70 additions and 70 deletions.
  1. +3 −2 broker.js
  2. +2 −1 client/client.js
  3. +1 −1 index.js
  4. +14 −4 kafka.js
  5. +3 −0 partition-set.js
  6. +2 −6 partition.js
  7. +4 −0 static-connector.js
  8. +8 −7 topic.js
  9. +33 −49 zkconnector.js
View
@@ -4,12 +4,13 @@ module.exports = function (
EventEmitter,
Client) {
- //TODO change to (id, options)
function Broker(id, options) {
this.id = id
this.client = Client.nil
this.reconnectAttempts = 0
this.options = options
+ this.host = options.host
+ this.port = options.port
this.connector = this.connect.bind(this)
this.onClientEnd = clientEnd.bind(this)
this.onClientReady = clientReady.bind(this)
@@ -94,7 +95,7 @@ module.exports = function (
this.emit('ready')
}
- Broker.nil = new Broker()
+ Broker.nil = new Broker(-1, {})
return Broker
}
View
@@ -10,6 +10,7 @@ module.exports = function (
ProduceRequest,
OffsetsRequest
) {
+
function Client(id, options) {
this.id = id
this.ready = false
@@ -135,7 +136,7 @@ module.exports = function (
}
}
- function connectionError() {
+ function connectionError(err) {
logger.info('client error', err.message)
}
View
@@ -43,7 +43,7 @@ module.exports = function (options) {
try {
var ZooKeeper = require('zookeeper')
var ZK = require('./zk')(logger, async, inherits, EventEmitter, ZooKeeper)
- var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Producer, Consumer, BrokerPool, Broker)
+ var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Broker)
}
catch (e) {
logger.error('node-zookeeper could not be loaded')
View
@@ -73,7 +73,7 @@ module.exports = function (
// onconnect: function () {}
Kafka.prototype.connect = function (onconnect) {
if (this.options.zookeeper) {
- this.connector = new ZKConnector(this.options)
+ this.connector = new ZKConnector(this, this.options)
}
else if (this.options.brokers) {
this.connector = new StaticConnector(this, this.options)
@@ -117,9 +117,8 @@ module.exports = function (
return topic
}
- Kafka.prototype.resumeTopic = function (topic) {
- // TODO this is where we might trigger client registration
- // and partition rebalancing
+ Kafka.prototype.register = function (topic) {
+ this.connector.register(topic)
}
Kafka.prototype.broker = function (id) {
@@ -132,6 +131,17 @@ module.exports = function (
Kafka.prototype.removeBroker = function (broker) {
this.allBrokers.remove(broker)
+ broker.destroy()
+ }
+
+ Kafka.prototype.removeBrokersNotIn = function (brokerIds) {
+ var brokers = this.allBrokers.all()
+ for (var i = 0; i < brokers.length; i++) {
+ var broker = brokers[i]
+ if (brokerIds.indexOf(broker.id) === -1) {
+ this.removeBroker(broker)
+ }
+ }
}
return Kafka
View
@@ -46,6 +46,9 @@ module.exports = function (
delete this.writables[name]
this.partitions.splice(i, 1)
}
+ if (this.partitions.length === 0) {
+ this.emit('empty')
+ }
}
PartitionSet.prototype.next = function () {
View
@@ -1,12 +1,12 @@
module.exports = function (logger, inherits, EventEmitter, Broker) {
- function Partition(topic, broker, id, offset) {
+ function Partition(topic, broker, id) {
this.topic = topic
this.broker = broker
this.id = id
this.fetchDelay = this.topic.minFetchDelay
this.emptyFetches = 0
- this.offset = offset || 0
+ this.offset = 0
this.paused = true
this.bufferedMessages = null
this.timer = null
@@ -83,10 +83,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.resume()
}
- Partition.prototype.saveOffset = function (saver) {
- saver.saveOffset(this)
- }
-
Partition.prototype.write = function (messages, cb) {
return this.broker.write(this, messages, cb)
}
View
@@ -33,5 +33,9 @@ module.exports = function (
this.kafka.addBroker(broker)
}
+ StaticConnector.prototype.register = function (topic) {
+ // noop
+ }
+
return StaticConnector
}
View
@@ -119,17 +119,13 @@ module.exports = function (
return this.paused || this.bufferedMessages.length > 0
}
- Topic.prototype.saveOffsets = function () {
- //this.consumer.saveOffsets(this)
- }
-
// Partitions
Topic.prototype.partition = function (brokerId, partitionId) {
var name = brokerId + '-' + partitionId
var partition = this.partitions.get(name)
if (!partition) {
- partition = new Partition(this, this.kafka.broker(brokerId), partitionId) // TODO options
+ partition = new Partition(this, this.kafka.broker(brokerId), partitionId)
this.partitions.add(partition)
}
return partition
@@ -161,9 +157,14 @@ module.exports = function (
var info = partitionInfo[i]
var nameOffset = info.split(':')
var name = nameOffset[0]
- //TODO offset
+
var partition = this.partitions.get(name)
partition.isReadable(true)
+
+ if (nameOffset.length === 2) {
+ var offset = +nameOffset[1]
+ partition.offset = offset
+ }
}
}
@@ -185,7 +186,7 @@ module.exports = function (
Topic.prototype.resume = function () {
logger.info('resume', this.name)
- this.kafka.resumeTopic(this)
+ this.kafka.register(this)
this.paused = this._flushBufferedMessages()
if (!this.paused) {
this.partitions.resume()
View
@@ -4,9 +4,6 @@ module.exports = function (
inherits,
EventEmitter,
ZK,
- Producer,
- Consumer,
- BrokerPool,
Broker
) {
@@ -23,31 +20,14 @@ module.exports = function (
// zookeeper:
// groupId:
// }
- function ZKConnector(options) {
- var self = this
+ function ZKConnector(kafka, options) {
+ this.kafka = kafka
this.options = options
this.zk = new ZK(options)
- this.allBrokers = new BrokerPool('all')
- this.producer = new Producer(this.allBrokers)
- this.allBrokers.on(
- 'brokerAdded',
- function (b) {
- self.emit('brokerAdded', b)
- }
- )
- this.allBrokers.on(
- 'brokerRemoved',
- function (b) {
- self.emit('brokerRemoved', b)
- }
- )
- this.brokerReady = function () {
- self.emit('brokerReady', this)
- }
this.hasPendingTopics = false
this.interestedTopics = {}
this.registerTopics = registerTopics.bind(this)
- this.consumer = new Consumer(this, options.groupId, this.allBrokers)
+ this.onBrokerConnect = brokerConnect.bind(this)
this.connect()
EventEmitter.call(this)
}
@@ -61,7 +41,7 @@ module.exports = function (
self.zk.subscribeToTopics()
})
this.zk.on('brokers', this._brokersChanged.bind(this))
- this.zk.on('broker-topic-partition', this._setBrokerTopicPartitionCount.bind(this))
+ this.zk.on('broker-topic-partition', this._setPartitionCount.bind(this))
this.zk.on('consumers-changed', this._rebalance.bind(this))
}
@@ -70,33 +50,39 @@ module.exports = function (
async.forEachSeries(
brokerIds,
function (id, next) {
- if (!self.allBrokers.contains(id)) {
+ if (!self.kafka.broker(id)) {
self.zk.getBroker(id, self._createBroker.bind(self))
}
},
function (err) {
- self.producer.removeBrokersNotIn(brokerIds)
+ self.kafka.removeBrokersNotIn(brokerIds)
}
)
}
ZKConnector.prototype._createBroker = function (id, info) {
- var self = this
- var split = info.split(':')
- if (split.length > 2) {
- var broker = new Broker(id, { host: split[1], port: split[2]})
- broker.on('ready', this.brokerReady)
- broker.once(
- 'connect',
- function () {
- self.allBrokers.add(broker)
+ var hostPort = info.split(':')
+ if (hostPort.length > 2) {
+ var host = hostPort[1]
+ var port = hostPort[2]
+ var oldBroker = this.kafka.broker(id)
+ if (oldBroker) {
+ if (oldBroker.host === host && oldBroker.port === port) {
+ return
}
- )
+ else {
+ this.kafka.removeBroker(oldBroker)
+ }
+ }
+ var broker = new Broker(id, { host: host, port: port })
+ broker.once('connect', this.onBrokerConnect)
+ broker.connect()
}
}
- ZKConnector.prototype._setBrokerTopicPartitionCount = function (broker, topic, count) {
- this.producer.setBrokerTopicPartitionCount(broker, topic, count)
+ ZKConnector.prototype._setPartitionCount = function (brokerId, topicName, count) {
+ var topic = this.kafka.topic(topicName)
+ topic.addWritablePartitions([brokerId + ':' + count])
}
ZKConnector.prototype._rebalance = function () {
@@ -133,19 +119,17 @@ module.exports = function (
}
}
- ZKConnector.prototype.consume = function (topic) {
- this.hasPendingTopics = true
- this.interestedTopics[topic.name] = topic
- process.nextTick(this.registerTopics)
+ ZKConnector.prototype.register = function (topic) {
+ return false // TODO enable when rebalance works
+ if (!this.interestedTopics[topic.name]) {
+ this.hasPendingTopics = true
+ this.interestedTopics[topic.name] = topic
+ process.nextTick(this.registerTopics)
+ }
}
- ZKConnector.prototype.saveOffset = function (partition) {
- logger.info(
- 'saving', partition.id,
- 'broker', partition.broker.id,
- 'offset', partition.offset
- )
- //TODO implement
+ function brokerConnect(broker) {
+ this.kafka.addBroker(broker)
}
return ZKConnector

0 comments on commit f4df8d9

Please sign in to comment.