Permalink
Browse files

got producing with static working the new way, still more to do

  • Loading branch information...
1 parent b3cf829 commit a7ea7dfce1136a831e828f7c7f9ae482e1592ffc @dannycoates committed Nov 16, 2012
Showing with 13 additions and 12 deletions.
  1. +1 −1 index.js
  2. +1 −1 kafka.js
  3. +2 −1 partition-set.js
  4. +9 −9 topic.js
View
@@ -30,11 +30,11 @@ function setLogger(logger) {
module.exports = function (options) {
var logger = setLogger(options.logger)
- var PartitionSet = require('./partition-set')()
var Client = require('./client')(logger)
var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
var Partition = require('./partition')(logger, inherits, EventEmitter, Broker)
+ var PartitionSet = require('./partition-set')(inherits, EventEmitter, Partition)
var MessageBuffer = require('./message-buffer')(inherits, EventEmitter)
var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet)
var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker)
View
@@ -76,7 +76,7 @@ module.exports = function (
this.connector = new ZKConnector(this.options)
}
else if (this.options.brokers) {
- this.connector = new StaticConnector(this.options)
+ this.connector = new StaticConnector(this, this.options)
}
this.allBrokers.once(
'brokerAdded', // TODO: create a more definitive event in the connectors
View
@@ -1,6 +1,7 @@
module.exports = function (
inherits,
- EventEmitter) {
+ EventEmitter,
+ Partition) {
function PartitionSet() {
this.partitionsByName = {}
View
@@ -33,11 +33,11 @@ module.exports = function (
this.maxFetchSize = options.maxFetchSize
this.maxMessageSize = options.maxMessageSize
this.kafka = kafka
+ this.partitions = new PartitionSet()
if (options.partitions) {
this.addWritablePartitions(options.partitions.produce)
this.consumePartitions = options.partitions.consume
}
- this.partitions = new PartitionSet()
this.ready = true
this.compression = options.compression
this.readable = true
@@ -112,22 +112,22 @@ module.exports = function (
var name = brokerId + '-' + partitionId
var partition = this.partitions.get(name)
if (!partition) {
- partition = new Partition(this, kafka.broker(brokerId), partitionId) // TODO options
+ partition = new Partition(this, this.kafka.broker(brokerId), partitionId) // TODO options
this.partitions.add(partition)
}
return partition
}
Topic.prototype.addWritablePartitions = function (partitionInfo) {
- if (!Array.isArray(partitionNames)) {
+ if (!Array.isArray(partitionInfo)) {
return
}
- for (var i = 0; i < partitionNames.length; i++) {
- var name = partitionNames[i]
- var split = name.split(':')
- if (split.length === 2) {
- var brokerId = +split[0]
- var partitionCount = +split[1]
+ for (var i = 0; i < partitionInfo.length; i++) {
+ var info = partitionInfo[i]
+ var brokerPartitionCount = info.split(':')
+ if (brokerPartitionCount.length === 2) {
+ var brokerId = +brokerPartitionCount[0]
+ var partitionCount = +brokerPartitionCount[1]
for (var j = 0; j < partitionCount; j++) {
var p = this.partition(brokerId, j)
p.isWritable(true)

0 comments on commit a7ea7df

Please sign in to comment.