Permalink
Browse files

made the topic readable/writable api symmetric

  • Loading branch information...
1 parent a7d8ba5 commit 92cac95b9f3543ae71961a8850b42c14653ded14 @dannycoates committed Nov 18, 2012
Showing with 25 additions and 17 deletions.
  1. +3 −3 message-buffer.js
  2. +22 −14 topic.js
View
@@ -9,7 +9,7 @@ module.exports = function (
this.messages = []
this.timer = null
this.send = send.bind(this)
- this.produceResponder = handleResponse.bind(this)
+ this.onProduceResponse = produceResponse.bind(this)
EventEmitter.call(this)
}
inherits(MessageBuffer, EventEmitter)
@@ -35,7 +35,7 @@ module.exports = function (
return true
}
- function handleResponse(err) {
+ function produceResponse(err) {
if (err) {
this.emit('error', err)
}
@@ -63,7 +63,7 @@ module.exports = function (
var batches = batchify(this.messages, this.batchSize)
for (var i = 0; i < batches.length; i++) {
var partition = this.partitions.nextWritable()
- sent = partition.write(batches[i], this.produceResponder)
+ sent = partition.write(batches[i], this.onProduceResponse)
}
this.reset()
}
View
@@ -43,7 +43,7 @@ module.exports = function (
this.partitions.on('ready', this.onPartitionsReady)
if (options.partitions) {
this.addWritablePartitions(options.partitions.produce)
- this.makePartitionsReadable(options.partitions.consume)
+ this.addReadablePartitions(options.partitions.consume)
}
this.produceBuffer = new MessageBuffer(
@@ -111,12 +111,17 @@ module.exports = function (
// Partitions
- Topic.prototype.partition = function (brokerId, partitionId) {
- var name = brokerId + '-' + partitionId
+ Topic.prototype.partition = function (name) {
var partition = this.partitions.get(name)
if (!partition) {
- partition = new Partition(this, this.kafka.broker(brokerId), partitionId)
- this.partitions.add(partition)
+ var brokerPartition = name.split('-')
+ var brokerId = +(brokerPartition[0])
+ var partitionId = +(brokerPartition[1])
+ var broker = this.kafka.broker(brokerId)
+ if (broker) {
+ partition = new Partition(this, broker, partitionId)
+ this.partitions.add(partition)
+ }
}
return partition
}
@@ -132,14 +137,16 @@ module.exports = function (
var brokerId = +brokerPartitionCount[0]
var partitionCount = +brokerPartitionCount[1]
for (var j = 0; j < partitionCount; j++) {
- var p = this.partition(brokerId, j)
- p.isWritable(true)
+ var p = this.partition(brokerId + '-' + j)
+ if (p) {
+ p.isWritable(true)
+ }
}
}
}
}
- Topic.prototype.makePartitionsReadable = function (partitionInfo) {
+ Topic.prototype.addReadablePartitions = function (partitionInfo) {
if (!Array.isArray(partitionInfo)) {
return
}
@@ -148,12 +155,13 @@ module.exports = function (
var nameOffset = info.split(':')
var name = nameOffset[0]
- var partition = this.partitions.get(name)
- partition.isReadable(true)
-
- if (nameOffset.length === 2) {
- var offset = +nameOffset[1]
- partition.offset = offset
+ var partition = this.partition(name)
+ if (partition) {
+ partition.isReadable(true)
+ if (nameOffset.length === 2) {
+ var offset = +nameOffset[1]
+ partition.offset = offset
+ }
}
}
}

0 comments on commit 92cac95

Please sign in to comment.