Skip to content
This repository has been archived by the owner on Jan 26, 2018. It is now read-only.

Commit

Permalink
more logification
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Nov 5, 2012
1 parent 022802c commit d21b3f3
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 22 deletions.
19 changes: 14 additions & 5 deletions broker-pool.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module.exports = function (inherits, EventEmitter) {
function BrokerPool() {
module.exports = function (logger, inherits, EventEmitter) {
function BrokerPool(name) {
this.name = name
this.brokers = []
this.brokersById = {}
this.current = 0
Expand All @@ -12,6 +13,10 @@ module.exports = function (inherits, EventEmitter) {
if (i >= 0) {
this.brokers.splice(i, 1)
delete this.brokersById[broker.id]
logger.log(
'brokerpool', this.name,
'removed', broker.id
)
this.emit('brokerRemoved', broker)
}
}
Expand All @@ -20,6 +25,10 @@ module.exports = function (inherits, EventEmitter) {
if (this.brokers.indexOf(broker) < 0) {
this.brokers.push(broker)
this.brokersById[broker.id] = broker
logger.log(
'brokerpool', this.name,
'added', broker.id
)
this.emit('brokerAdded', broker)
}
}
Expand All @@ -32,7 +41,7 @@ module.exports = function (inherits, EventEmitter) {
BrokerPool.prototype.nextReady = function () {
for (var i = 0; i < this.brokers.length; i++) {
var b = this.next()
if (b.ready()) {
if (b.isReady()) {
break
}
}
Expand All @@ -44,7 +53,7 @@ module.exports = function (inherits, EventEmitter) {
var n = (Math.floor(Math.random() * len))
for (var i = 0; i < len; i++) {
var b = this.brokers[n]
if (b.ready()) {
if (b.isReady()) {
break
}
n = (n + 1) % len
Expand All @@ -68,7 +77,7 @@ module.exports = function (inherits, EventEmitter) {
return this.brokers
}

BrokerPool.nil = new BrokerPool()
BrokerPool.nil = new BrokerPool('nil')

return BrokerPool
}
27 changes: 20 additions & 7 deletions broker.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = function (
logger,
inherits,
EventEmitter,
Client) {
Expand All @@ -16,48 +17,54 @@ module.exports = function (

function Broker(id, host, port) {
this.id = id
this.host = host
this.port = port
this.topicPartitions = {}
this.client = null
this.connect()
this.connect(host, port)
EventEmitter.call(this)
}
inherits(Broker, EventEmitter)

Broker.prototype.connect = function () {
Broker.prototype.connect = function (host, port) {
var self = this
if (!this.client) {
logger.log(
'connecting broker', self.id,
'host', host,
'port', port
)
this.client = new Client(
this.id,
{
host: this.host,
port: this.port
host: host,
port: port
}
)
this.client.once(
'connect',
function () {
logger.log('broker connected', self.id)
self.emit('connect')
}
)
this.client.once(
'end',
function () {
//TODO: smarter reconnect
logger.log('broker ended', self.id)
self.connect()
}
)
this.client.on(
'ready',
function () {
logger.log('broker ready', self.id)
self.emit('ready')
}
)
}
}

Broker.prototype.ready = function () {
Broker.prototype.isReady = function () {
return this.client.ready
}

Expand All @@ -66,10 +73,16 @@ module.exports = function (
}

Broker.prototype.setTopicPartitions = function (name, count) {
logger.log(
'set broker partitions', this.id,
'topic', name,
'partitions', count
)
this.topicPartitions[name] = new TopicPartition(name, count)
}

Broker.prototype.clearTopicPartitions = function () {
logger.log('clear broker partitions', this.id)
this.topicPartitions = {}
}

Expand Down
8 changes: 6 additions & 2 deletions client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ module.exports = function (
this.connection.on(
'drain',
function () {
self.ready = true
self.emit('ready')
if (!self.ready) { //TODO: why is connection.drain so frequent?
self.ready = true
self.emit('ready')
}
}
)
this.id = id
Expand All @@ -44,8 +46,10 @@ module.exports = function (

Client.prototype.drain = function (cb) {
var self = this
logger.log('draining', this.id)
this.receiver.close(
function () {
logger.log('drained', self.id)
// XXX is reopening correct here?
self.receiver.open()
cb()
Expand Down
6 changes: 6 additions & 0 deletions consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ module.exports = function (

Consumer.prototype.consume = function (topic, partitions) {
logger.assert(Array.isArray(partitions))
logger.log(
'consuming', topic.name,
'partitions', partitions.length,
'group', this.groupId,
'consumer', this.consumerId
)
var name = topic.name
var owner = this.owners[name] || new Owner(topic, this.allBrokers)
this.owners[name] = owner
Expand Down
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ module.exports = function (options) {
var Partition = require('./partition')(logger)
var Owner = require('./owner')(Partition)
var Consumer = require('./consumer')(logger, async, os, inherits, EventEmitter, Owner)
var Broker = require('./broker')(inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(inherits, EventEmitter)
var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
var Producer = require('./producer')(inherits, EventEmitter, BrokerPool)
var MessageBuffer = require('./message-buffer')()
var Topic = require('./topic')(inherits, EventEmitter, MessageBuffer)
Expand Down
2 changes: 1 addition & 1 deletion producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module.exports = function (
Producer.prototype.setBrokerTopicPartitionCount = function (id, name, count) {
var b = this.allBrokers.get(id)
if (b) {
var topicBrokers = this.topicBrokers[name] || new BrokerPool()
var topicBrokers = this.topicBrokers[name] || new BrokerPool(name)
topicBrokers.add(b)
this.topicBrokers[name] = topicBrokers
b.setTopicPartitions(name, count)
Expand Down
2 changes: 1 addition & 1 deletion static-connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = function (
function StaticConnector(options) {
var self = this
this.options = options
this.allBrokers = new BrokerPool()
this.allBrokers = new BrokerPool('all')
this.producer = new Producer(this.allBrokers)
this.consumer = new Consumer(options.groupId, this.allBrokers)

Expand Down
5 changes: 1 addition & 4 deletions zkconnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ module.exports = function (
function ZKConnector(options) {
var self = this
this.zk = new ZK(options)
this.allBrokers = new BrokerPool()
this.allBrokers = new BrokerPool('all')
this.producer = new Producer(this.allBrokers)
this.allBrokers.on(
'brokerAdded',
function (b) {
logger.log('broker added', b.id)
self.emit('brokerAdded', b)
}
)
this.allBrokers.on(
'brokerRemoved',
function (b) {
logger.log('broker removed', b.id)
self.emit('brokerRemoved', b)
}
)
Expand Down Expand Up @@ -104,7 +102,6 @@ module.exports = function (
self.zk.getTopicPartitions(self.interestedTopics, self.consumer, next)
},
function (topicPartitions) {
logger.log(topicPartitions)
for(var i = 0; i < topicPartitions.length; i++) {
var tp = topicPartitions[i]
self.consumer.consume(tp.topic, tp.partitions)
Expand Down

0 comments on commit d21b3f3

Please sign in to comment.