Permalink
Browse files

static consuming is now working

  • Loading branch information...
1 parent fc95b86 commit 36ef45ce42c2c3bc62d6b28ce43a8f6d78aafe1d @dannycoates committed Nov 17, 2012
Showing with 36 additions and 11 deletions.
  1. +15 −8 client/produce-request.js
  2. +5 −0 kafka.js
  3. +0 −1 partition.js
  4. +16 −2 topic.js
@@ -25,14 +25,21 @@ module.exports = function (
ProduceRequest.prototype._compress = function (cb) {
var messageBuffers = this.messages.map(messageToBuffer)
var messagesLength = messageBuffers.reduce(sumLength, 0)
- var wrapper = new Message()
- wrapper.setData(
- Buffer.concat(messageBuffers, messagesLength),
- this.topic.compression,
- function (err) {
- cb(err, wrapper.toBuffer())
- }
- )
+ var payload = Buffer.concat(messageBuffers, messagesLength)
+
+ if (this.topic.compression === Message.compression.NONE) {
+ cb(null, payload)
+ }
+ else {
+ var wrapper = new Message()
+ wrapper.setData(
+ payload,
+ this.topic.compression,
+ function (err) {
+ cb(err, wrapper.toBuffer())
+ }
+ )
+ }
}
// 0 1 2 3
View
@@ -117,6 +117,11 @@ module.exports = function (
return topic
}
+ Kafka.prototype.resumeTopic = function (topic) {
+ // TODO this is where we might trigger client registration
+ // and partition rebalancing
+ }
+
Kafka.prototype.broker = function (id) {
return this.allBrokers.get(id)
}
View
@@ -47,7 +47,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.emit('ready', this)
}
- // TODO consider Partition as an EventEmitter with no reference to topic
function Partition(topic, broker, id, offset) {
this.topic = topic
this.broker = broker
View
@@ -37,7 +37,7 @@ module.exports = function (
this.partitions.on('ready', partitionsReady.bind(this))
if (options.partitions) {
this.addWritablePartitions(options.partitions.produce)
- this.consumePartitions = options.partitions.consume
+ this.makePartitionsReadable(options.partitions.consume)
}
this.ready = true
this.compression = options.compression
@@ -147,6 +147,20 @@ module.exports = function (
}
}
+ Topic.prototype.makePartitionsReadable = function (partitionInfo) {
+ if (!Array.isArray(partitionInfo)) {
+ return
+ }
+ for (var i = 0; i < partitionInfo.length; i++) {
+ var info = partitionInfo[i]
+ var nameOffset = info.split(':')
+ var name = nameOffset[0]
+ //TODO offset
+ var partition = this.partitions.get(name)
+ partition.isReadable(true)
+ }
+ }
+
// TODO a way to add/remove readablePartitions
// Readable Stream
@@ -166,8 +180,8 @@ module.exports = function (
}
Topic.prototype.resume = function () {
- //TODO first resume setup
logger.info('resume', this.name)
+ this.kafka.resumeTopic(this)
this.paused = this._flushBufferedMessages()
if (!this.paused) {
this.partitions.resume()

0 comments on commit 36ef45c

Please sign in to comment.