Permalink
Browse files

producer backpressure now works in new scheme

  • Loading branch information...
1 parent a7ea7df commit fc95b863ac75d4a83420eec8a081381cea9987d6 @dannycoates committed Nov 17, 2012
Showing with 45 additions and 35 deletions.
  1. +2 −1 broker.js
  2. +1 −1 client/client.js
  3. +7 −3 example.js
  4. +0 −12 kafka.js
  5. +3 −0 message-buffer.js
  6. +8 −1 partition-set.js
  7. +5 −0 partition.js
  8. +19 −17 topic.js
View
@@ -37,6 +37,7 @@ module.exports = function (
logger.info('broker connected', this.id)
this.reconnectAttempts = 0
this.emit('connect', this)
+ this.emit('ready')
}.bind(this)
)
this.client.once(
@@ -54,7 +55,7 @@ module.exports = function (
'ready',
function () {
logger.info('broker ready', this.id)
- this.emit('ready', this)
+ this.emit('ready')
}.bind(this)
)
}
View
@@ -44,7 +44,7 @@ module.exports = function (
this.connection.on(
'error',
function (err) {
- logger.info('client error', err)
+ logger.info('client error', err.message)
}
)
this.connection.on(
View
@@ -18,6 +18,7 @@ var kafka = new Kafka({
logger: console
})
var i = 0
+var ready = true
file.once('open', function () {
kafka.connect(function () {
@@ -45,11 +46,14 @@ file.once('open', function () {
this.resume()
})
+ baz.on('drain', function() { ready = true })
+
setInterval(
function () {
- baz.write('i is ' + i + '\n')
- i++
- //baz.write("the time is: " + Date.now())
+ if (ready) {
+ ready = baz.write('i is ' + i + '\n')
+ i++
+ }
},
10
)
View
@@ -84,18 +84,6 @@ module.exports = function (
this.emit('connect')
}.bind(this)
)
- this.connector.on(
- 'brokerReady',
- function (b) {
- var topics = Object.keys(this.topics)
- for (var i = 0; i < topics.length; i++) {
- var name = topics[i]
- if (b.hasTopic(name)) {
- this.topics[name].setReady(true)
- }
- }
- }.bind(this)
- )
if (typeof(onconnect) === 'function') {
this.once('connect', onconnect)
}
View
@@ -34,6 +34,9 @@ module.exports = function (
}
this.reset()
}
+ else {
+ this.emit('full')
+ }
return sent
}
View
@@ -9,6 +9,7 @@ module.exports = function (
this.current = 0
this.onReadableChanged = readableChanged.bind(this)
this.onWritableChanged = writableChanged.bind(this)
+ this.onPartitionReady = partitionReady.bind(this)
this.readables = {}
this.writables = {}
EventEmitter.call(this)
@@ -33,6 +34,10 @@ module.exports = function (
}
}
+ function partitionReady(partition) {
+ this.emit('ready')
+ }
+
PartitionSet.prototype.get = function (name) {
return this.partitionsByName[name]
}
@@ -41,6 +46,7 @@ module.exports = function (
if (this.partitions.indexOf(partition) < 0) {
partition.on('writable', this.onWritableChanged)
partition.on('readable', this.onReadableChanged)
+ partition.on('ready', this.onPartitionReady)
this.partitionsByName[partition.name()] = partition
this.partitions.push(partition)
}
@@ -52,6 +58,7 @@ module.exports = function (
var p = this.partitions[i]
p.removeListener('writable', this.onWritableChanged)
p.removeListener('readable', this.onReadableChanged)
+ p.removeListener('ready', this.onPartitionReady)
delete this.partitionsByName[p.name()]
this.partitions.splice(i, 1)
}
@@ -69,7 +76,7 @@ module.exports = function (
PartitionSet.prototype.isReady = function () {
return this.partitions.some(
function (p) {
- return p.isReady()
+ return p.isReady() && p.isWritable()
}
)
}
View
@@ -43,6 +43,10 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
}
}
+ function brokerReady() {
+ this.emit('ready', this)
+ }
+
// TODO consider Partition as an EventEmitter with no reference to topic
function Partition(topic, broker, id, offset) {
this.topic = topic
@@ -56,6 +60,7 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.paused = true
this.bufferedMessages = null
this.timer = null
+ this.broker.on('ready', brokerReady.bind(this))
// TODO: readable and writable determine whether this partition
// can be used for consuming or producing.
// I think writable might always be true, but readable is determined
View
@@ -34,6 +34,7 @@ module.exports = function (
this.maxMessageSize = options.maxMessageSize
this.kafka = kafka
this.partitions = new PartitionSet()
+ this.partitions.on('ready', partitionsReady.bind(this))
if (options.partitions) {
this.addWritablePartitions(options.partitions.produce)
this.consumePartitions = options.partitions.consume
@@ -43,17 +44,13 @@ module.exports = function (
this.readable = true
this.writable = true
this.encoding = null
- this.outgoingMessages = new MessageBuffer(
+ this.produceBuffer = new MessageBuffer(
this.partitions,
options.batchSize,
options.queueTime
)
- this.outgoingMessages.on(
- 'error',
- function (err) {
- this.error(err)
- }.bind(this)
- )
+ this.produceBuffer.on('error', this.error.bind(this))
+ this.produceBuffer.on('full', produceBufferFull.bind(this))
this.bufferedMessages = []
this.emitMessages = emitMessages.bind(this)
Stream.call(this)
@@ -63,6 +60,20 @@ module.exports = function (
//emit end
//emit close
+ function partitionsReady() {
+ if(!this.ready) {
+ logger.log('topic ready', this.name)
+ this.produceBuffer.flush()
+ this.emit('drain')
+ }
+ this.ready = true
+ }
+
+ function produceBufferFull() {
+ logger.info('topic full', this.name)
+ this.ready = false
+ }
+
function emitMessages(payloads) {
for (var i = 0; i < payloads.length; i++) {
var data = payloads[i]
@@ -173,21 +184,12 @@ module.exports = function (
//Writable Stream
- //TODO figure out the new ready behaviour
- Topic.prototype.setReady = function (ready) {
- if(ready && !this.ready) {
- this.outgoingMessages.flush()
- this.emit('drain')
- }
- this.ready = ready
- }
-
Topic.prototype.write = function (data, encoding) {
if(!Buffer.isBuffer(data)) {
encoding = encoding || 'utf8'
data = new Buffer(data, encoding)
}
- return this.outgoingMessages.push(data)
+ return this.produceBuffer.push(data)
}
Topic.prototype.end = function (data, encoding) {

0 comments on commit fc95b86

Please sign in to comment.