Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

began a consumer refactor to support zk consumers

  • Loading branch information...
commit 42866feae54eb5dbc8bb911c502f242973fdce21 1 parent 1d0262b
@dannycoates authored
View
1  client/client.js
@@ -18,6 +18,7 @@ module.exports = function (
this.receiver = null
this.connection = net.connect(options)
+ this.connection.setKeepAlive(true, 1000 * 60 * 5)
this.onConnectionConnect = connectionConnect.bind(this)
this.onConnectionEnd = connectionEnd.bind(this)
this.onConnectionDrain = connectionDrain.bind(this)
View
123 consumer.js
@@ -0,0 +1,123 @@
+module.exports = function (logger, inherits, EventEmitter, RingList) {
+
+ function Consumer() {
+ EventEmitter.call(this)
+ this.partitions = new RingList()
+ this.pending = false
+ this.paused = true
+ this.emptyFetches = 0
+ this.fetch = fetch.bind(this)
+ this.fetchCallback = fetchCallback.bind(this)
+ this.on('response', response)
+ this.bufferedMessages = null
+ this.timer = null
+ }
+ inherits(Consumer, EventEmitter)
+
+ function exponentialBackoff(attempt, delay) {
+ return Math.floor(Math.random() * Math.pow(2, attempt) * 10 + delay)
+ }
+
+ Consumer.prototype._nextFetchDelay = function (shouldDelay) {
+ this.emptyFetches = shouldDelay ? this.emptyFetches + 1 : 0
+ var min = 0
+ var max = 10000
+ var partition = this.partitions.current()
+ if (partition) {
+ min = partition.minFetchDelay()
+ max = partition.maxFetchDelay()
+ }
+ return Math.min(exponentialBackoff(this.emptyFetches, min), max)
+ }
+
+ Consumer.prototype._loop = function (shouldDelay) {
+ var fetchDelay = this._nextFetchDelay(shouldDelay)
+ if (fetchDelay) {
+ this.timer = setTimeout(this.fetch, fetchDelay)
+ }
+ else {
+ this.fetch()
+ }
+ }
+
+ Consumer.prototype.flush = function () {
+ if (this.bufferedMessages) {
+ this.emit('messages', this, this.bufferedMessages)
+ this.bufferedMessages = null
+ }
+ }
+
+ Consumer.prototype.add = function (partition) {
+ this.partitions.add(partition)
+ }
+
+ Consumer.prototype.remove = function (partition) {
+ this.partitions.remove(partition)
+ }
+
+ Consumer.prototype.pause = function () {
+ this.paused = true
+ clearTimeout(this.timer)
+ }
+
+ Consumer.prototype.resume = function () {
+ this.paused = false
+ this.flush()
+ if (!this.paused && !this.pending) {
+ this.fetch()
+ }
+ }
+
+ Consumer.prototype.drain = function (cb) {
+ this.pause()
+ if (this.pending) {
+ this.once(
+ 'response',
+ function () {
+ this.flush()
+ cb()
+ }.bind(this)
+ )
+ }
+ else {
+ this.flush()
+ cb()
+ }
+ }
+
+ function isReady(partition) {
+ return partition.isReady()
+ }
+
+ function fetch() {
+ clearTimeout(this.timer)
+ var partition = this.partitions.next(isReady)
+ if (partition) {
+ this.pending = true
+ partition.fetch(this.fetchCallback)
+ }
+ else {
+ this._loop(true)
+ }
+ }
+
+ function fetchCallback(err, messages) {
+ this.pending = false
+ this.emit('response', err, messages)
+ }
+
+ function response(err, messages) {
+ if (err) {
+ return this.emit('error', err)
+ }
+ if (this.paused) {
+ this.bufferedMessages = messages
+ }
+ else {
+ this._loop(messages.length === 0)
+ this.emit('messages', this, messages)
+ }
+ }
+
+ return Consumer
+}
View
6 index.js
@@ -5,6 +5,8 @@ var inherits = require('util').inherits
var EventEmitter = require('events').EventEmitter
var Stream = require('stream')
+var RingList = require('./ring-list')()
+
var noop = function () {}
var nullLogger = {}
Object.keys(console).forEach(function (f) { nullLogger[f] = noop })
@@ -31,10 +33,12 @@ module.exports = function (options) {
var logger = setLogger(options.logger)
var Client = require('./client')(logger)
+ var Consumer = require('./consumer')(logger, inherits, EventEmitter, RingList)
+ var Producer = require('./producer')(logger, inherits, RingList)
var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
var Partition = require('./partition')(logger, inherits, EventEmitter, Broker)
- var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter)
+ var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter, Consumer, Producer)
var MessageBuffer = require('./message-buffer')(inherits, EventEmitter)
var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet)
var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker)
View
6 kafka.js
@@ -97,6 +97,12 @@ module.exports = function (
Kafka.prototype.close = function () {
this.allBrokers.close()
this.connector.close()
+ var topicNames = Object.keys(this.topics)
+ for (var i = 0; i < topicNames.length; i++) {
+ var name = topicNames[i]
+ this.topics[name].destroy()
+ delete this.topics[name]
+ }
}
function setTopicOptions(topicOptions, defaults) {
View
3  message-buffer.js
@@ -63,8 +63,7 @@ module.exports = function (inherits, EventEmitter) {
if (this.partitions.isReady() && this.messages.length > 0) {
var batches = batchify(this.messages, this.batchSize)
for (var i = 0; i < batches.length; i++) {
- var partition = this.partitions.nextWritable()
- sent = partition.write(batches[i], this.onProduceResponse)
+ this.partitions.write(batches[i], this.onProduceResponse)
}
this.messages = []
}
View
108 partition-set.js
@@ -1,124 +1,108 @@
-module.exports = function (logger, inherits, EventEmitter) {
+module.exports = function (logger, inherits, EventEmitter, Consumer, Producer) {
// A PartitionSet contains all of the known Partitions (for a Topic)
// It tracks which partitions are 'readable' and 'writable'
function PartitionSet() {
- this.partitionsByName = {}
- this.partitions = []
- this.current = 0
+ this.partitions = {}
+ this.consumer = new Consumer()
+ this.producer = new Producer()
+
+ this.onConsumerMessages = consumerMessages.bind(this)
+ this.onConsumerError = consumerError.bind(this)
this.onReadableChanged = readableChanged.bind(this)
this.onWritableChanged = writableChanged.bind(this)
this.onPartitionReady = partitionReady.bind(this)
this.onPartitionDestroy = partitionDestroy.bind(this)
- this.readables = {}
- this.writables = {}
+
+ this.consumer.on('messages', this.onConsumerMessages)
+ this.consumer.on('error', this.onConsumerError)
EventEmitter.call(this)
}
inherits(PartitionSet, EventEmitter)
PartitionSet.prototype.get = function (name) {
- return this.partitionsByName[name]
+ return this.partitions[name]
}
PartitionSet.prototype.add = function (partition) {
- if (this.partitions.indexOf(partition) < 0) {
+ if(!this.partitions[partition.name]) {
partition.on('writable', this.onWritableChanged)
partition.on('readable', this.onReadableChanged)
partition.on('ready', this.onPartitionReady)
partition.on('destroy', this.onPartitionDestroy)
- this.partitionsByName[partition.name] = partition
- this.partitions.push(partition)
+ this.partitions[partition.name] = partition
logger.info('added partition', partition.name)
}
}
PartitionSet.prototype.remove = function (partition) {
- var i = this.partitions.indexOf(partition)
- if (i >= 0) {
- var p = this.partitions[i]
- var name = p.name
- p.removeListener('writable', this.onWritableChanged)
- p.removeListener('readable', this.onReadableChanged)
- p.removeListener('ready', this.onPartitionReady)
- p.removeListener('destroy', this.onPartitionDestroy)
- delete this.partitionsByName[name]
- delete this.readables[name]
- delete this.writables[name]
- this.partitions.splice(i, 1)
- logger.info('removed partition', name)
- }
+ this.consumer.remove(partition)
+ this.producer.remove(partition)
+ partition.removeListener('writable', this.onWritableChanged)
+ partition.removeListener('readable', this.onReadableChanged)
+ partition.removeListener('ready', this.onPartitionReady)
+ partition.removeListener('destroy', this.onPartitionDestroy)
+ delete this.partitions[partition.name]
+ logger.info('removed partition', partition.name)
}
- PartitionSet.prototype.next = function () {
- this.current = (this.current + 1) % this.partitions.length
- return this.partitions[this.current]
+ PartitionSet.prototype.isReady = function () {
+ return this.producer.isReady()
}
- PartitionSet.prototype.all = function () {
- return this.partitions
+ PartitionSet.prototype.pause = function () {
+ this.consumer.pause()
}
- function isReadyAndWritable(p) { return p.isReady() && p.isWritable() }
-
- PartitionSet.prototype.isReady = function () {
- return this.partitions.some(isReadyAndWritable)
+ PartitionSet.prototype.resume = function () {
+ this.consumer.resume()
}
- PartitionSet.prototype.nextWritable = function () {
- var partition = null
- for (var i = 0; i < this.partitions.length; i++) {
- partition = this.next()
- if (isReadyAndWritable(partition)) {
- return partition
- }
+ PartitionSet.prototype.stopConsuming = function () {
+ var names = Object.keys(this.partitions)
+ for (var i = 0; i < names.length; i++) {
+ var partition = this.partitions[names[i]]
+ partition.isReadable(false)
}
- return partition
}
- PartitionSet.prototype.length = function () {
- return this.partitions.length
+ PartitionSet.prototype.drain = function (cb) {
+ this.consumer.drain(cb)
}
- function readablePartition(p) { return p.isReadable() }
- PartitionSet.prototype.readable = function () {
- return this.partitions.filter(readablePartition)
+ PartitionSet.prototype.write = function (messages, cb) {
+ this.producer.write(messages, cb)
}
- function pausePartition(p) { p.pause() }
- PartitionSet.prototype.pause = function () {
- this.readable().forEach(pausePartition)
- }
+ // Event handlers
- function resumePartition(p) { p.resume() }
- PartitionSet.prototype.resume = function () {
- this.readable().forEach(resumePartition)
+ function consumerMessages(partition, messages) {
+ this.emit('messages', partition, messages)
}
- function stopPartition(p) { p.stop() }
- PartitionSet.prototype.stop = function () {
- this.readable().forEach(stopPartition)
- }
+ function consumerError(err) {
- // Event handlers
+ }
function readableChanged(partition) {
if (partition.isReadable()) {
- this.readables[partition.name] = partition
+ this.consumer.add(partition)
}
else {
- delete this.readables[partition.name]
+ this.consumer.remove(partition)
}
}
function writableChanged(partition) {
if (partition.isWritable()) {
- this.writables[partition.name] = partition
+ this.producer.add(partition)
+ // TODO: emit less
if (this.isReady()) {
this.emit('ready')
}
}
else {
- delete this.writables[partition.name]
+ this.producer.remove(partition)
}
}
View
128 partition.js
@@ -2,27 +2,14 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
// A Partition represents the location of messages for a Topic.
// Partitions may be 'readable' and/or 'writable'.
- //
- // A readable Partition repeatedly fetches new messages from the server
- // for consumption. The fetch loop is controlled with 'pause' and 'resume'.
- //
- // A writable Partition will write messages to the server for production.
function Partition(topic, broker, id) {
this.topic = topic
this.broker = broker
this.id = id
this.name = this.broker.id + '-' + this.id
- this.fetchDelay = this.topic.minFetchDelay
- this.emptyFetches = 0
this.offset = 0
- this.paused = true
- this.bufferedMessages = null
- this.timer = null
this.readable = null
this.writable = null
- this.fetcher = fetch.bind(this)
- this.pending = false
- this.onFetchResponse = fetchResponse.bind(this)
this.onBrokerReady = brokerReady.bind(this)
this.onBrokerDestroy = brokerDestroy.bind(this)
this.broker.on('ready', this.onBrokerReady)
@@ -31,65 +18,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
}
inherits(Partition, EventEmitter)
- Partition.prototype._setFetchDelay = function (shouldDelay) {
- this.emptyFetches = shouldDelay ? this.emptyFetches + 1 : 0
- this.fetchDelay = Math.min(
- exponentialBackoff(this.emptyFetches, this.topic.minFetchDelay),
- this.topic.maxFetchDelay
- )
- logger.info(
- 'fetch', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id,
- 'delay', this.fetchDelay,
- 'empty', this.emptyFetches
- )
- }
-
- Partition.prototype._loop = function () {
- if (this.fetchDelay) {
- this.timer = setTimeout(this.fetcher, this.fetchDelay)
- }
- else {
- this.fetcher()
- }
- }
-
- Partition.prototype.flush = function () {
- if (this.bufferedMessages) {
- this.topic.parseMessages(this, this.bufferedMessages)
- this.bufferedMessages = null
- }
- }
-
- Partition.prototype.resume = function () {
- logger.info(
- 'resume', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id
- )
- this.paused = false
- this.flush()
- if (!this.paused && !this.pending) {
- this.fetcher()
- }
- }
-
- Partition.prototype.pause = function () {
- logger.info(
- 'pause', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id
- )
- this.paused = true
- clearTimeout(this.timer)
- }
-
- Partition.prototype.reset = function () {
- this.pause()
- this.resume()
- }
-
Partition.prototype.write = function (messages, cb) {
return this.broker.write(this, messages, cb)
}
@@ -114,48 +42,27 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
return this.readable
}
- function exponentialBackoff(attempt, delay) {
- return Math.floor(
- Math.random() * Math.pow(2, attempt) * 10 + delay
- )
+ Partition.prototype.fetch = function(cb) {
+ this.broker.fetch(this.topic, this, fetchResponse.bind(this, cb))
}
- function fetchResponse(err, length, messages) {
- this.pending = false
- if (err) {
- return this.topic.error(err)
- }
- this.offset += length
- if (this.paused) {
- logger.info(
- 'buffered', messages.length,
- 'topic', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id
- )
- this.bufferedMessages = messages
- }
- else {
- this.topic.parseMessages(this, messages)
- this._setFetchDelay(length === 0)
- this._loop()
- }
+ Partition.prototype.minFetchDelay = function () {
+ return this.topic.minFetchDelay
}
- function fetch() {
- clearTimeout(this.timer)
- if (this.isReady()) {
- this.pending = true
- this.broker.fetch(
- this.topic,
- this,
- this.onFetchResponse
- )
- }
- else {
- this._setFetchDelay(true)
- this._loop()
+ Partition.prototype.maxFetchDelay = function () {
+ return this.topic.maxFetchDelay
+ }
+
+ Partition.prototype.toString = function () {
+ return '(topic ' + this.topic.name + ' partition ' + this.name + ')'
+ }
+
+ function fetchResponse(cb, err, length, messages) {
+ if (!err) {
+ this.offset += length
}
+ cb(err, messages)
}
function brokerReady() {
@@ -163,7 +70,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
}
function brokerDestroy() {
- this.pause()
this.isWritable(false)
this.isReadable(false)
this.broker.removeListener('ready', this.onBrokerReady)
@@ -172,7 +78,5 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.emit('destroy', this)
}
- Partition.nil = new Partition({ minFetchDelay: 0 }, Broker.nil, -1)
-
return Partition
}
View
20 producer.js
@@ -0,0 +1,20 @@
+module.exports = function (logger, inherits, RingList) {
+
+ function Producer() {
+ RingList.call(this)
+ }
+ inherits(Producer, RingList)
+
+ function isReadyAndWritable(p) { return p.isReady() && p.isWritable() }
+
+ Producer.prototype.isReady = function () {
+ return this.items.some(isReadyAndWritable)
+ }
+
+ Producer.prototype.write = function (messages, cb) {
+ console.assert(this.isReady()) // TODO
+ this.next(isReadyAndWritable).write(messages, cb)
+ }
+
+ return Producer
+}
View
2  readme.md
@@ -15,7 +15,7 @@ var kafka = new Kafka({
logger: console
})
-kafka.on('connect', function () {
+kafka.connect(function () {
// topics are Streams
var foo = kafka.topic('foo')
View
40 ring-list.js
@@ -0,0 +1,40 @@
+module.exports = function () {
+ function RingList() {
+ this.items = []
+ this.index = 0
+ }
+
+ RingList.prototype.add = function (item) {
+ if (this.items.indexOf(item) < 0) {
+ this.items.push(item)
+ }
+ }
+
+ RingList.prototype.remove = function (item) {
+ var i = this.items.indexOf(item)
+ if (i >= 0) {
+ this.items.splice(i, 1)
+ }
+ }
+
+ RingList.prototype.next = function (filter) {
+ if (filter) {
+ var item = null
+ for (var i = 0; i < this.items.length; i++) {
+ item = this.next()
+ if (filter(item)) {
+ return item
+ }
+ }
+ return null
+ }
+ this.index = (this.index + 1) % this.items.length
+ return this.current()
+ }
+
+ RingList.prototype.current = function () {
+ return this.items[this.index]
+ }
+
+ return RingList
+}
View
18 topic.js
@@ -27,6 +27,8 @@ module.exports = function (
// }
// }
function Topic(name, kafka, options) {
+ options = options || {}
+ options.partitions = options.partitions || {}
this.name = name || ''
this.kafka = kafka
this.encoding = null
@@ -42,7 +44,9 @@ module.exports = function (
this.partitions = new PartitionSet()
this.onPartitionsReady = partitionsReady.bind(this)
+ this.onPartitionMessages = this.parseMessages.bind(this)
this.partitions.on('ready', this.onPartitionsReady)
+ this.partitions.on('messages', this.onPartitionMessages)
this.produceBuffer = new MessageBuffer(
this.partitions,
@@ -52,10 +56,13 @@ module.exports = function (
this.onError = this.error.bind(this)
this.produceBuffer.on('error', this.onError)
- if (options.partitions) {
- this.addWritablePartitions(options.partitions.produce)
+ if (options.partitions.consume) {
this.addReadablePartitions(options.partitions.consume)
}
+
+ if (options.partitions.produce) {
+ this.addWritablePartitions(options.partitions.produce)
+ }
else {
// create a partition to start with
// TODO: which broker to pick?
@@ -201,6 +208,11 @@ module.exports = function (
return false
}
+ Topic.prototype.resetConsumer = function (cb) {
+ this.partitions.stopConsuming()
+ this.partitions.drain(cb)
+ }
+
// Readable Stream
Topic.prototype.pause = function () {
@@ -219,7 +231,7 @@ module.exports = function (
}
Topic.prototype.destroy = function () {
- this.partitions.stop()
+ this.resetConsumer(function () {})
}
Topic.prototype.setEncoding = function (encoding) {
View
19 zk.js
@@ -10,7 +10,8 @@ module.exports = function (
// A feable attempt a wrangling the horrible ZooKeeper API
function ZK(options) {
this.zk = new ZooKeeper({
- hosts: options.zookeeper
+ hosts: options.zookeeper,
+ logger: logger
})
this.zk.once(
'expired',
@@ -177,7 +178,6 @@ module.exports = function (
if (err) {
logger.error('create consumer roots', err)
}
- logger.info('created roots')
cb(err)
}
)
@@ -192,16 +192,16 @@ module.exports = function (
return JSON.stringify(ts)
}
- ZK.prototype.registerTopics = function (topics, consumer, cb) {
+ ZK.prototype.registerTopics = function (topics, kafka, cb) {
var self = this
- logger.info('registerTopics')
+ logger.info('register', topics)
async.series([
function (next) {
- self._createConsumerRoots(consumer.groupId, next)
+ self._createConsumerRoots(kafka.groupId, next)
},
function (next) {
self._createOrReplace(
- '/consumers/' + consumer.groupId + '/ids/' + consumer.consumerId,
+ '/consumers/' + kafka.groupId + '/ids/' + kafka.consumerId,
toTopicString(topics),
self.zk.create.EPHEMERAL,
next
@@ -209,7 +209,7 @@ module.exports = function (
}
],
function (err) {
- logger.info('registeredTopics')
+ logger.info('registered')
cb(err)
}
)
@@ -217,8 +217,9 @@ module.exports = function (
ZK.prototype.getTopicPartitions = function (topics, consumer, cb) {
//TODO
- throw new Error("Not Implemented")
- cb(null, [{topic: topics['bazzz'], partitions: ['0-0:0']}])
+ return cb(null, [])
+ //return cb(null, [{topic: 'foo', partitions: ['0-0','0-1','1-0','1-1']}])
+ //cb(null, [{topic: topics['bazzz'], partitions: ['0-0:0']}])
}
return ZK
View
23 zkconnector.js
@@ -105,25 +105,32 @@ module.exports = function (
logger.info('rebalancing')
async.waterfall([
function (next) {
- async.forEach(
+ async.forEachSeries(
this._topics(),
function (topic, done) {
- topic.stop()
- topic.drain(done)
+ logger.info('draining', topic.name)
+ topic.resetConsumer(done)
},
function (err) {
+ logger.info('drained')
next()
})
}.bind(this),
function (next) {
- self.zk.getTopicPartitions(self.interestedTopics, self.consumer, next)
- },
+ this.zk.getTopicPartitions(this.interestedTopics, this.kafka, next)
+ }.bind(this),
function (topicPartitions) {
for(var i = 0; i < topicPartitions.length; i++) {
var tp = topicPartitions[i]
- self.consumer.consume(tp.topic, tp.partitions)
+ var topic = this.kafka.topic(tp.topic)
+ logger.info(
+ 'consume', tp.topic,
+ 'partitions', tp.partitions
+ )
+ topic.addReadablePartitions(tp.partitions)
+ topic.resume()
}
- }
+ }.bind(this)
])
}
@@ -132,7 +139,7 @@ module.exports = function (
var self = this
this.zk.registerTopics(
this.interestedTopics,
- this.consumer,
+ this.kafka,
function () {
self.rebalance()
}

0 comments on commit 42866fe

Please sign in to comment.
Something went wrong with that request. Please try again.