Permalink
Browse files

lots of little things... deal with it

  • Loading branch information...
1 parent f4df8d9 commit 6bf5f60e603f796293b0239be84dcadfc5d5c9ee @dannycoates committed Nov 18, 2012
Showing with 63 additions and 84 deletions.
  1. +5 −24 broker-pool.js
  2. +1 −1 broker.js
  3. +0 −1 client/client.js
  4. +5 −5 example.js
  5. +1 −1 index.js
  6. +13 −24 kafka.js
  7. +12 −9 partition-set.js
  8. +1 −4 partition.js
  9. +7 −8 static-connector.js
  10. +1 −1 topic.js
  11. +17 −6 zkconnector.js
View
29 broker-pool.js
@@ -1,9 +1,7 @@
module.exports = function (logger, inherits, EventEmitter) {
- function BrokerPool(name) {
- this.name = name
+ function BrokerPool() {
this.brokers = []
this.brokersById = {}
- this.current = 0
EventEmitter.call(this)
}
inherits(BrokerPool, EventEmitter)
@@ -13,44 +11,27 @@ module.exports = function (logger, inherits, EventEmitter) {
if (i >= 0) {
this.brokers.splice(i, 1)
delete this.brokersById[broker.id]
- logger.info(
- 'brokerpool', this.name,
- 'removed', broker.id
- )
- this.emit('brokerRemoved', broker)
+ logger.info('removed', broker.id)
+ this.emit('removed', broker)
}
}
BrokerPool.prototype.add = function (broker) {
if (this.brokers.indexOf(broker) < 0) {
this.brokers.push(broker)
this.brokersById[broker.id] = broker
- logger.info(
- 'brokerpool', this.name,
- 'added', broker.id
- )
- this.emit('brokerAdded', broker)
+ logger.info('added', broker.id)
+ this.emit('added', broker)
}
}
- BrokerPool.prototype.next = function () {
- this.current = (this.current + 1) % this.brokers.length
- return this.brokers[this.current]
- }
-
BrokerPool.prototype.get = function (id) {
return this.brokersById[id]
}
- BrokerPool.prototype.contains = function (id) {
- return !!this.get(id)
- }
-
BrokerPool.prototype.all = function () {
return this.brokers
}
- BrokerPool.nil = new BrokerPool('nil')
-
return BrokerPool
}
View
2 broker.js
@@ -59,6 +59,7 @@ module.exports = function (
this.client.removeListener('ready', this.onClientReady)
this.client.end()
this.client = Client.nil
+ logger.info('broker destroyed', this.id)
this.emit('destroy')
}
@@ -72,7 +73,6 @@ module.exports = function (
logger.info('broker connected', this.id)
this.reconnectAttempts = 0
this.emit('connect', this)
- this.emit('ready')
}
function clientEnd() {
View
1 client/client.js
@@ -126,7 +126,6 @@ module.exports = function (
logger.info('client end')
this.ready = false
this.emit('end')
- this.connection = null
}
function connectionDrain() {
View
10 example.js
@@ -4,7 +4,7 @@ var fs = require('fs')
var file = fs.createWriteStream('./test.txt')
var kafka = new Kafka({
- //zookeeper: 'localhost:2181',
+ zookeeper: 'localhost:2181',
brokers: [{
id: 0,
host: 'localhost',
@@ -24,10 +24,10 @@ file.once('open', function () {
kafka.connect(function () {
var baz = kafka.topic('bazzz', {
- partitions: {
- consume: ['0-0'],
- produce: ['0:1']
- }
+ // partitions: {
+ // consume: ['0-0'],
+ // produce: ['0:1']
+ // }
})
baz.pipe(file)
View
2 index.js
@@ -34,7 +34,7 @@ module.exports = function (options) {
var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
var Partition = require('./partition')(logger, inherits, EventEmitter, Broker)
- var PartitionSet = require('./partition-set')(inherits, EventEmitter, Partition)
+ var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter, Partition)
var MessageBuffer = require('./message-buffer')(inherits, EventEmitter)
var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet)
var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker)
View
37 kafka.js
@@ -28,14 +28,20 @@ module.exports = function (
this.connector = null
this.groupId = this.options.groupId || 'franz-kafka'
this.consumerId = genConsumerId(this.groupId)
- this.allBrokers = new BrokerPool('all')
+ this.onBrokerAdded = brokerAdded.bind(this)
+ this.onBrokerRemoved = brokerRemoved.bind(this)
+ this.allBrokers = new BrokerPool()
+ this.allBrokers.once('added', this.onBrokerAdded)
+ this.allBrokers.on('removed', this.onBrokerRemoved)
this.topicDefaults = this.defaultOptions(options)
EventEmitter.call(this)
}
inherits(Kafka, EventEmitter)
function genConsumerId(groupId) {
- return groupId + '_' + os.hostname() + '-' + Date.now() + '-' + "DEADBEEF"
+ var rand = Buffer(4)
+ rand.writeUInt32BE(Math.floor(Math.random() * 0xFFFFFFFF), 0)
+ return groupId + '_' + os.hostname() + '-' + Date.now() + '-' + rand.toString('hex')
}
function setCompression(string) {
@@ -73,17 +79,11 @@ module.exports = function (
// onconnect: function () {}
Kafka.prototype.connect = function (onconnect) {
if (this.options.zookeeper) {
- this.connector = new ZKConnector(this, this.options)
+ this.connector = new ZKConnector(this, this.allBrokers, this.options)
}
else if (this.options.brokers) {
- this.connector = new StaticConnector(this, this.options)
+ this.connector = new StaticConnector(this, this.allBrokers, this.options)
}
- this.allBrokers.once(
- 'brokerAdded', // TODO: create a more definitive event in the connectors
- function () {
- this.emit('connect')
- }.bind(this)
- )
if (typeof(onconnect) === 'function') {
this.once('connect', onconnect)
}
@@ -125,24 +125,13 @@ module.exports = function (
return this.allBrokers.get(id)
}
- Kafka.prototype.addBroker = function (broker) {
- this.allBrokers.add(broker)
+ function brokerAdded(broker) {
+ this.emit('connect')
}
- Kafka.prototype.removeBroker = function (broker) {
- this.allBrokers.remove(broker)
+ function brokerRemoved(broker) {
broker.destroy()
}
- Kafka.prototype.removeBrokersNotIn = function (brokerIds) {
- var brokers = this.allBrokers.all()
- for (var i = 0; i < brokers.length; i++) {
- var broker = brokers[i]
- if (brokerIds.indexOf(broker.id) === -1) {
- this.removeBroker(broker)
- }
- }
- }
-
return Kafka
}
View
21 partition-set.js
@@ -1,4 +1,5 @@
module.exports = function (
+ logger,
inherits,
EventEmitter,
Partition) {
@@ -27,16 +28,17 @@ module.exports = function (
partition.on('readable', this.onReadableChanged)
partition.on('ready', this.onPartitionReady)
partition.on('destroy', this.onPartitionDestroy)
- this.partitionsByName[partition.name()] = partition
+ this.partitionsByName[partition.name] = partition
this.partitions.push(partition)
+ logger.info('added partition', partition.name)
}
}
PartitionSet.prototype.remove = function (partition) {
var i = this.partitions.indexOf(partition)
if (i >= 0) {
var p = this.partitions[i]
- var name = p.name()
+ var name = p.name
p.removeListener('writable', this.onWritableChanged)
p.removeListener('readable', this.onReadableChanged)
p.removeListener('ready', this.onPartitionReady)
@@ -45,9 +47,7 @@ module.exports = function (
delete this.readables[name]
delete this.writables[name]
this.partitions.splice(i, 1)
- }
- if (this.partitions.length === 0) {
- this.emit('empty')
+ logger.info('removed partition', name)
}
}
@@ -111,19 +111,22 @@ module.exports = function (
function readableChanged(partition) {
if (partition.isReadable()) {
- this.readables[partition.name()] = partition
+ this.readables[partition.name] = partition
}
else {
- delete this.readables[partition.name()]
+ delete this.readables[partition.name]
}
}
function writableChanged(partition) {
if (partition.isWritable()) {
- this.writables[partition.name()] = partition
+ this.writables[partition.name] = partition
+ if (this.isReady()) {
+ this.emit('ready')
+ }
}
else {
- delete this.writables[partition.name()]
+ delete this.writables[partition.name]
}
}
View
5 partition.js
@@ -4,6 +4,7 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.topic = topic
this.broker = broker
this.id = id
+ this.name = this.broker.id + '-' + this.id
this.fetchDelay = this.topic.minFetchDelay
this.emptyFetches = 0
this.offset = 0
@@ -46,10 +47,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
}
}
- Partition.prototype.name = function () {
- return this.broker.id + '-' + this.id
- }
-
Partition.prototype.flush = function () {
if (this.bufferedMessages) {
this.topic.parseMessages(this, this.bufferedMessages)
View
15 static-connector.js
@@ -14,13 +14,12 @@ module.exports = function (
// }
// ]
// }
- function StaticConnector(kafka, options) {
- this.kafka = kafka
- this.options = options
- this.onBrokerConnect = addBroker.bind(this)
+ function StaticConnector(kafka, brokers, options) {
+ this.brokers = brokers
+ this.onBrokerConnect = brokerConnect.bind(this)
- for (var i = 0; i < this.options.brokers.length; i++) {
- var b = this.options.brokers[i]
+ for (var i = 0; i < options.brokers.length; i++) {
+ var b = options.brokers[i]
var broker = new Broker(b.id, { host: b.host, port: b.port })
broker.once('connect', this.onBrokerConnect)
broker.connect()
@@ -29,8 +28,8 @@ module.exports = function (
}
inherits(StaticConnector, EventEmitter)
- function addBroker(broker) {
- this.kafka.addBroker(broker)
+ function brokerConnect(broker) {
+ this.brokers.add(broker)
}
StaticConnector.prototype.register = function (topic) {
View
2 topic.js
@@ -100,7 +100,7 @@ module.exports = function (
}
Topic.prototype.parseMessages = function(partition, messages) {
- this.emit('offset', partition.name(), partition.offset)
+ this.emit('offset', partition.name, partition.offset)
for (var i = 0; i < messages.length; i++) {
messages[i].unpack(this.emitMessages)
}
View
23 zkconnector.js
@@ -20,8 +20,9 @@ module.exports = function (
// zookeeper:
// groupId:
// }
- function ZKConnector(kafka, options) {
+ function ZKConnector(kafka, brokers, options) {
this.kafka = kafka
+ this.brokers = brokers
this.options = options
this.zk = new ZK(options)
this.hasPendingTopics = false
@@ -50,12 +51,12 @@ module.exports = function (
async.forEachSeries(
brokerIds,
function (id, next) {
- if (!self.kafka.broker(id)) {
+ if (!self.brokers.get(id)) {
self.zk.getBroker(id, self._createBroker.bind(self))
}
},
function (err) {
- self.kafka.removeBrokersNotIn(brokerIds)
+ self._removeBrokersNotIn(brokerIds)
}
)
}
@@ -65,13 +66,13 @@ module.exports = function (
if (hostPort.length > 2) {
var host = hostPort[1]
var port = hostPort[2]
- var oldBroker = this.kafka.broker(id)
+ var oldBroker = this.brokers.get(id)
if (oldBroker) {
if (oldBroker.host === host && oldBroker.port === port) {
return
}
else {
- this.kafka.removeBroker(oldBroker)
+ this.brokers.remove(oldBroker)
}
}
var broker = new Broker(id, { host: host, port: port })
@@ -85,6 +86,16 @@ module.exports = function (
topic.addWritablePartitions([brokerId + ':' + count])
}
+ ZKConnector.prototype._removeBrokersNotIn = function (brokerIds) {
+ var brokers = this.brokers.all()
+ for (var i = 0; i < brokers.length; i++) {
+ var broker = brokers[i]
+ if (brokerIds.indexOf(broker.id) === -1) {
+ this.brokers.remove(broker)
+ }
+ }
+ }
+
ZKConnector.prototype._rebalance = function () {
var self = this
logger.info('rebalancing')
@@ -129,7 +140,7 @@ module.exports = function (
}
function brokerConnect(broker) {
- this.kafka.addBroker(broker)
+ this.brokers.add(broker)
}
return ZKConnector

0 comments on commit 6bf5f60

Please sign in to comment.