Permalink
Browse files

fixed topic initialization order for static partitions

  • Loading branch information...
1 parent 2b9022a commit d348a404a736ce1f6db1308c36bb3718bfb618fe @dannycoates committed Nov 18, 2012
Showing with 8 additions and 6 deletions.
  1. +1 −0 broker.js
  2. +7 −6 topic.js
View
1 broker.js
@@ -73,6 +73,7 @@ module.exports = function (
logger.info('broker connected', this.id)
this.reconnectAttempts = 0
this.emit('connect', this)
+ this.emit('ready')
}
function clientEnd() {
View
13 topic.js
@@ -37,14 +37,12 @@ module.exports = function (
this.maxFetchDelay = options.maxFetchDelay
this.maxFetchSize = options.maxFetchSize
this.maxMessageSize = options.maxMessageSize
+ this.bufferedMessages = []
+ this.emitMessages = emitMessages.bind(this)
this.partitions = new PartitionSet()
this.onPartitionsReady = partitionsReady.bind(this)
this.partitions.on('ready', this.onPartitionsReady)
- if (options.partitions) {
- this.addWritablePartitions(options.partitions.produce)
- this.addReadablePartitions(options.partitions.consume)
- }
this.produceBuffer = new MessageBuffer(
this.partitions,
@@ -54,8 +52,11 @@ module.exports = function (
this.onError = this.error.bind(this)
this.produceBuffer.on('error', this.onError)
- this.bufferedMessages = []
- this.emitMessages = emitMessages.bind(this)
+ if (options.partitions) {
+ this.addWritablePartitions(options.partitions.produce)
+ this.addReadablePartitions(options.partitions.consume)
+ }
+
Stream.call(this)
}
inherits(Topic, Stream)

0 comments on commit d348a40

Please sign in to comment.