Permalink
Browse files

topic doesn't need to track 'ready' itself

  • Loading branch information...
1 parent 6bf5f60 commit a7d8ba54212a0dc3d97e0e17c7f7b8955fdf864a @dannycoates committed Nov 18, 2012
Showing with 20 additions and 35 deletions.
  1. +8 −9 example.js
  2. +9 −13 message-buffer.js
  3. +3 −13 topic.js
View
@@ -48,15 +48,14 @@ file.once('open', function () {
baz.on('drain', function() { ready = true })
- setInterval(
- function () {
- if (ready) {
- ready = baz.write('i is ' + i + '\n')
- i++
- }
- },
- 10
- )
+ function writeLoop() {
+ if (ready) {
+ ready = baz.write('i is ' + i + '\n')
+ i++
+ }
+ setTimeout(writeLoop, 10)
+ }
+ writeLoop()
}
)
View
@@ -20,20 +20,19 @@ module.exports = function (
this.timer = null
}
- MessageBuffer.prototype.push = function(message) {
- if (!this.timer) {
- this.timer = setTimeout(this.send, this.queueTime)
- }
- if (this.messages.push(message) >= this.batchSize) {
- return this.send()
- }
- return true
+ MessageBuffer.prototype.push = function (message) {
+ this.messages.push(message)
+ return this.flush()
}
MessageBuffer.prototype.flush = function () {
- if (this.messages.length > 0) {
- this.send()
+ if (this.messages.length >= this.batchSize) {
+ return this.send()
+ }
+ if (!this.timer) {
+ this.timer = setTimeout(this.send, this.queueTime)
}
+ return true
}
function handleResponse(err) {
@@ -68,9 +67,6 @@ module.exports = function (
}
this.reset()
}
- else {
- this.emit('full')
- }
return sent
}
View
@@ -29,7 +29,6 @@ module.exports = function (
function Topic(name, kafka, options) {
this.name = name || ''
this.kafka = kafka
- this.ready = true
this.encoding = null
this.readable = true // required Stream property
this.writable = true // required Stream property
@@ -53,9 +52,7 @@ module.exports = function (
options.queueTime
)
this.onError = this.error.bind(this)
- this.onProduceBufferFull = produceBufferFull.bind(this)
this.produceBuffer.on('error', this.onError)
- this.produceBuffer.on('full', this.onProduceBufferFull)
this.bufferedMessages = []
this.emitMessages = emitMessages.bind(this)
@@ -67,17 +64,10 @@ module.exports = function (
//emit close
function partitionsReady() {
- if(!this.ready) {
- logger.log('topic ready', this.name)
- this.produceBuffer.flush()
+ if(this.produceBuffer.flush()) {
+ logger.info('drain', this.name)
this.emit('drain')
}
- this.ready = true
- }
-
- function produceBufferFull() {
- logger.info('topic full', this.name)
- this.ready = false
}
function emitMessages(payloads) {
@@ -99,7 +89,7 @@ module.exports = function (
}
}
- Topic.prototype.parseMessages = function(partition, messages) {
+ Topic.prototype.parseMessages = function (partition, messages) {
this.emit('offset', partition.name, partition.offset)
for (var i = 0; i < messages.length; i++) {
messages[i].unpack(this.emitMessages)

0 comments on commit a7d8ba5

Please sign in to comment.