Browse files

Merge pull request #2 from dannycoates/partition-refactor

Partition refactor
  • Loading branch information...
2 parents d09d224 + d348a40 commit e8bd7d18fa35c937f476228c62b28f56ddb11b02 @dannycoates committed Nov 17, 2012
Showing with 685 additions and 754 deletions.
  1. +5 −51 broker-pool.js
  2. +64 −76 broker.js
  3. +60 −45 client/client.js
  4. +8 −8 client/fetch-body.js
  5. +28 −20 client/produce-request.js
  6. +33 −27 client/receiver.js
  7. +0 −94 consumer.js
  8. +15 −12 example.js
  9. +7 −9 index.js
  10. +34 −23 kafka.js
  11. +41 −42 message-buffer.js
  12. +0 −83 owner.js
  13. +139 −0 partition-set.js
  14. +94 −54 partition.js
  15. +0 −80 producer.js
  16. +11 −43 static-connector.js
  17. +102 −38 topic.js
  18. +44 −49 zkconnector.js
View
56 broker-pool.js
@@ -1,9 +1,7 @@
module.exports = function (logger, inherits, EventEmitter) {
- function BrokerPool(name) {
- this.name = name
+ function BrokerPool() {
this.brokers = []
this.brokersById = {}
- this.current = 0
EventEmitter.call(this)
}
inherits(BrokerPool, EventEmitter)
@@ -13,71 +11,27 @@ module.exports = function (logger, inherits, EventEmitter) {
if (i >= 0) {
this.brokers.splice(i, 1)
delete this.brokersById[broker.id]
- logger.info(
- 'brokerpool', this.name,
- 'removed', broker.id
- )
- this.emit('brokerRemoved', broker)
+ logger.info('removed', broker.id)
+ this.emit('removed', broker)
}
}
BrokerPool.prototype.add = function (broker) {
if (this.brokers.indexOf(broker) < 0) {
this.brokers.push(broker)
this.brokersById[broker.id] = broker
- logger.info(
- 'brokerpool', this.name,
- 'added', broker.id
- )
- this.emit('brokerAdded', broker)
+ logger.info('added', broker.id)
+ this.emit('added', broker)
}
}
- BrokerPool.prototype.next = function () {
- this.current = (this.current + 1) % this.brokers.length
- return this.brokers[this.current]
- }
-
- BrokerPool.prototype.nextReady = function () {
- for (var i = 0; i < this.brokers.length; i++) {
- var b = this.next()
- if (b.isReady()) {
- break
- }
- }
- return b
- }
-
- BrokerPool.prototype.randomReady = function () {
- var len = this.brokers.length
- var n = (Math.floor(Math.random() * len))
- for (var i = 0; i < len; i++) {
- var b = this.brokers[n]
- if (b.isReady()) {
- break
- }
- n = (n + 1) % len
- }
- return b
- }
-
- BrokerPool.prototype.areAnyReady = function () {
- return this.brokers.some(function (b) { return b.isReady() })
- }
-
BrokerPool.prototype.get = function (id) {
return this.brokersById[id]
}
- BrokerPool.prototype.contains = function (id) {
- return !!this.get(id)
- }
-
BrokerPool.prototype.all = function () {
return this.brokers
}
- BrokerPool.nil = new BrokerPool('nil')
-
return BrokerPool
}
View
140 broker.js
@@ -4,111 +4,99 @@ module.exports = function (
EventEmitter,
Client) {
- function TopicPartition(name, count) {
- this.name = name
- this.count = count
- this.current = 0
- }
-
- TopicPartition.prototype.next = function () {
- this.current = (this.current + 1) % this.count
- return this.current
- }
-
- function Broker(id, host, port, options) {
+ function Broker(id, options) {
this.id = id
- this.topicPartitions = {}
- this.client = null
+ this.client = Client.nil
this.reconnectAttempts = 0
- options = options || {}
- options.host = host
- options.port = port
- this.connect(options)
+ this.options = options
+ this.host = options.host
+ this.port = options.port
+ this.connector = this.connect.bind(this)
+ this.onClientEnd = clientEnd.bind(this)
+ this.onClientReady = clientReady.bind(this)
+ this.onClientConnect = clientConnect.bind(this)
EventEmitter.call(this)
}
inherits(Broker, EventEmitter)
- function exponentialBackoff(attempt) {
- return Math.floor(
- Math.random() * Math.pow(2, attempt) * 10
- )
- }
-
- Broker.prototype.connect = function (options) {
+ Broker.prototype.connect = function () {
+ var options = this.options
logger.info(
'connecting broker', this.id,
'host', options.host,
'port', options.port
)
this.client = new Client(this.id, options)
- this.client.once(
- 'connect',
- function () {
- logger.info('broker connected', this.id)
- this.reconnectAttempts = 0
- this.emit('connect', this)
- }.bind(this)
- )
- this.client.once(
- 'end',
- function () {
- this.reconnectAttempts++
- logger.info('broker ended', this.id, this.reconnectAttempts)
- setTimeout(
- function () {
- this.connect(options)
- },
- exponentialBackoff(this.reconnectAttempts)
- )
- }.bind(this)
- )
- this.client.on(
- 'ready',
- function () {
- logger.info('broker ready', this.id)
- this.emit('ready', this)
- }.bind(this)
- )
+
+ this.client.once('connect', this.onClientConnect)
+ this.client.once('end', this.onClientEnd)
+ this.client.on('ready', this.onClientReady)
+
+ this.reconnectTimer = null
}
Broker.prototype.isReady = function () {
return this.client.ready
}
- Broker.prototype.hasTopic = function (name) {
- return !!this.topicPartitions[name]
+ Broker.prototype.fetch = function (topic, partition, cb) {
+ this.client.fetch(topic, partition, cb)
}
- Broker.prototype.setTopicPartitions = function (name, count) {
- logger.info(
- 'set broker partitions', this.id,
- 'topic', name,
- 'partitions', count
- )
- this.topicPartitions[name] = new TopicPartition(name, count)
+ Broker.prototype.write = function (partition, messages, cb) {
+ return this.client.write(partition.topic, messages, partition.id, cb)
}
- Broker.prototype.clearTopicPartitions = function () {
- logger.info('clear broker partitions', this.id)
- this.topicPartitions = {}
+ Broker.prototype.drain = function (cb) {
+ this.client.drain(cb)
}
- Broker.prototype.fetch = function (topic, partition, cb) {
- this.client.fetch(topic, partition, cb)
+ Broker.prototype.destroy = function () {
+ clearTimeout(this.reconnectTimer)
+ this.reconnectTimer = null
+ this.client.removeListener('connect', this.onClientConnect)
+ this.client.removeListener('end', this.onClientEnd)
+ this.client.removeListener('ready', this.onClientReady)
+ this.client.end()
+ this.client = Client.nil
+ logger.info('broker destroyed', this.id)
+ this.emit('destroy')
}
- Broker.prototype.write = function (topic, messages, cb) {
- var partitionId = 0
- var tp = this.topicPartitions[topic.name]
- if (tp) {
- partitionId = tp.next()
- }
- return this.client.write(topic, messages, partitionId, cb)
+ function exponentialBackoff(attempt) {
+ return Math.floor(
+ Math.random() * Math.pow(2, attempt) * 10
+ )
}
- Broker.prototype.drain = function (cb) {
- this.client.drain(cb)
+ function clientConnect() {
+ logger.info('broker connected', this.id)
+ this.reconnectAttempts = 0
+ this.emit('connect', this)
+ this.emit('ready')
}
+ function clientEnd() {
+ this.reconnectAttempts++
+ logger.info(
+ 'broker ended', this.id,
+ 'reconnects', this.reconnectAttempts
+ )
+ this.client.removeListener('connect', this.onClientConnect)
+ this.client.removeListener('end', this.onClientEnd)
+ this.client.removeListener('ready', this.onClientReady)
+ this.reconnectTimer = setTimeout(
+ this.connector,
+ exponentialBackoff(this.reconnectAttempts)
+ )
+ }
+
+ function clientReady() {
+ logger.info('broker ready', this.id)
+ this.emit('ready')
+ }
+
+ Broker.nil = new Broker(-1, {})
+
return Broker
}
View
105 client/client.js
@@ -10,54 +10,26 @@ module.exports = function (
ProduceRequest,
OffsetsRequest
) {
+
function Client(id, options) {
- this.connection = net.connect(options)
- this.connection.on(
- 'connect',
- function () {
- logger.info('client connect')
- this.readableSteam = new ReadableStream()
- this.readableSteam.wrap(this.connection)
- this.receiver = new Receiver(this.readableSteam)
- this.ready = true
- this.emit('connect')
- }.bind(this)
- )
- this.connection.on(
- 'end',
- function () {
- logger.info('client end')
- this.ready = false
- this.emit('end')
- this.connection = null
- }.bind(this)
- )
- this.connection.on(
- 'drain',
- function () {
- if (!this.ready) { //TODO: why is connection.drain so frequent?
- this.ready = true
- this.emit('ready')
- }
- }.bind(this)
- )
- this.connection.on(
- 'error',
- function (err) {
- //logger.info('client error', err)
- }
- )
- this.connection.on(
- 'close',
- function (hadError) {
- logger.info('client closed with error:', hadError)
- this.emit('end')
- }.bind(this)
- )
this.id = id
this.ready = false
this.readableSteam = null
this.receiver = null
+
+ this.connection = net.connect(options)
+ this.onConnectionConnect = connectionConnect.bind(this)
+ this.onConnectionEnd = connectionEnd.bind(this)
+ this.onConnectionDrain = connectionDrain.bind(this)
+ this.onConnectionError = connectionError.bind(this)
+ this.onConnectionClose = connectionClose.bind(this)
+
+ this.connection.on('connect', this.onConnectionConnect)
+ this.connection.on('end', this.onConnectionEnd)
+ this.connection.on('drain', this.onConnectionDrain)
+ this.connection.on('error', this.onConnectionError)
+ this.connection.on('close', this.onConnectionClose)
+
EventEmitter.call(this)
}
inherits(Client, EventEmitter)
@@ -74,20 +46,25 @@ module.exports = function (
)
}
+ Client.prototype.end = function () {
+ this.connection.end()
+ }
+
Client.prototype._send = function (request, cb) {
request.serialize(
this.connection,
- afterSend.bind(this, cb, request)
+ afterSend.bind(this, request, cb)
)
return this.ready
}
- function afterSend(cb, request, err, written) {
+ function afterSend(request, cb, err, written) {
if (err) {
this.ready = false
return cb(err)
}
if (!written) {
+ logger.info('connection', 'wait')
this.ready = false
}
this.receiver.push(request, cb)
@@ -135,5 +112,43 @@ module.exports = function (
Client.compression = Message.compression
+ Client.nil = { ready: false }
+
+ function connectionConnect() {
+ logger.info('client connect')
+ this.readableSteam = new ReadableStream()
+ this.readableSteam.wrap(this.connection)
+ this.receiver = new Receiver(this.readableSteam)
+ this.ready = true
+ this.emit('connect')
+ }
+
+ function connectionEnd() {
+ logger.info('client end')
+ this.ready = false
+ this.emit('end')
+ }
+
+ function connectionDrain() {
+ if (!this.ready) { //TODO: why is connection.drain so frequent?
+ this.ready = true
+ this.emit('ready')
+ }
+ }
+
+ function connectionError(err) {
+ logger.info('client error', err.message)
+ }
+
+ function connectionClose(hadError) {
+ logger.info('client closed. with error', hadError)
+ this.connection.removeListener('connect', this.onConnectionConnect)
+ this.connection.removeListener('end', this.onConnectionEnd)
+ this.connection.removeListener('drain', this.onConnectionDrain)
+ this.connection.removeListener('error', this.onConnectionError)
+ this.connection.removeListener('close', this.onConnectionClose)
+ this.emit('end')
+ }
+
return Client
}
View
16 client/fetch-body.js
@@ -3,14 +3,6 @@ module.exports = function (
State,
Message) {
- function FetchError(message, messageLength) {
- this.message = message
- this.messageLength = messageLength
- Error.call(this)
- }
- inherits(FetchError, Error)
- FetchError.prototype.name = 'Fetch Error'
-
function FetchBody(bytes) {
this.bytesParsed = 0
this.lastMessageLength = 0
@@ -55,5 +47,13 @@ module.exports = function (
return err
}
+ function FetchError(message, messageLength) {
+ this.message = message
+ this.messageLength = messageLength
+ Error.call(this)
+ }
+ inherits(FetchError, Error)
+ FetchError.prototype.name = 'Fetch Error'
+
return FetchBody
}
View
48 client/produce-request.js
@@ -4,14 +4,6 @@ module.exports = function (
Message,
State) {
- function ProduceError(message, length) {
- this.message = message
- this.length = length
- Error.call(this)
- }
- inherits(ProduceError, Error)
- ProduceError.prototype.name = 'Produce Error'
-
function ProduceRequest(topic, partitionId, messages) {
this.topic = topic
this.partitionId = partitionId
@@ -25,14 +17,21 @@ module.exports = function (
ProduceRequest.prototype._compress = function (cb) {
var messageBuffers = this.messages.map(messageToBuffer)
var messagesLength = messageBuffers.reduce(sumLength, 0)
- var wrapper = new Message()
- wrapper.setData(
- Buffer.concat(messageBuffers, messagesLength),
- this.topic.compression,
- function (err) {
- cb(err, wrapper.toBuffer())
- }
- )
+ var payload = Buffer.concat(messageBuffers, messagesLength)
+
+ if (this.topic.compression === Message.compression.NONE) {
+ cb(null, payload)
+ }
+ else {
+ var wrapper = new Message()
+ wrapper.setData(
+ payload,
+ this.topic.compression,
+ function (err) {
+ cb(err, wrapper.toBuffer())
+ }
+ )
+ }
}
// 0 1 2 3
@@ -57,7 +56,8 @@ module.exports = function (
if (err) {
return cb(err)
}
- if (buffer.length > this.topic.maxMessageSize) {
+ if (this.topic.compression !== Message.compression.NONE &&
+ buffer.length > this.topic.maxMessageSize) {
return cb(new ProduceError("message too big", buffer.length))
}
var header = new RequestHeader(
@@ -67,11 +67,11 @@ module.exports = function (
this.partitionId
)
try {
- header.serialize(stream)
+ var written = header.serialize(stream)
var mlen = new Buffer(4)
mlen.writeUInt32BE(buffer.length, 0)
- stream.write(mlen)
- var written = stream.write(buffer)
+ written = stream.write(mlen) && written
+ written = stream.write(buffer) && written
}
catch (e) {
err = e
@@ -84,5 +84,13 @@ module.exports = function (
return State.done
}
+ function ProduceError(message, length) {
+ this.message = message
+ this.length = length
+ Error.call(this)
+ }
+ inherits(ProduceError, Error)
+ ProduceError.prototype.name = 'Produce Error'
+
return ProduceRequest
}
View
60 client/receiver.js
@@ -6,36 +6,18 @@ module.exports = function (
function Receiver(stream) {
this.stream = stream
- this.stream.on(
- 'readable',
- function () {
- this.read()
- }.bind(this)
- )
- this.stream.on(
- 'end',
- function () {
- logger.info(
- 'receiver', 'ended',
- 'queue', this.queue.length
- )
- while (this.queue.length > 0) {
- this.current.abort()
- this.next()
- }
- this.current.abort()
- this.closed = true
- }.bind(this)
- )
- this.stream.on(
- 'error',
- function (err) {
- logger.info('receiver error', err.message)
- }
- )
this.queue = []
this.current = State.nil
this.closed = false
+
+ this.onStreamReadable = streamReadable.bind(this)
+ this.onStreamEnd = streamEnd.bind(this)
+ this.onStreamError = streamError.bind(this)
+
+ this.stream.on('readable', this.onStreamReadable)
+ this.stream.on('end', this.onStreamEnd)
+ this.stream.on('error', this.onStreamError)
+
EventEmitter.call(this)
}
inherits(Receiver, EventEmitter)
@@ -86,5 +68,29 @@ module.exports = function (
return true
}
+ function streamReadable() {
+ this.read()
+ }
+
+ function streamEnd() {
+ logger.info(
+ 'receiver', 'ended',
+ 'queue', this.queue.length
+ )
+ while (this.queue.length > 0) {
+ this.current.abort()
+ this.next()
+ }
+ this.current.abort()
+ this.closed = true
+ this.stream.removeListener('readable', this.onStreamReadable)
+ this.stream.removeListener('end', this.onStreamEnd)
+ this.stream.removeListener('error', this.onStreamError)
+ }
+
+ function streamError(err) {
+ logger.info('receiver error', err.message)
+ }
+
return Receiver
}
View
94 consumer.js
@@ -1,94 +0,0 @@
-module.exports = function (
- logger,
- async,
- os,
- inherits,
- EventEmitter,
- Owner) {
-
- function genConsumerId(groupId) {
- return groupId + '_' + os.hostname() + '-' + Date.now() + '-' + "DEADBEEF"
- }
-
- function Consumer(connector, groupId, allBrokers) {
- this.connector = connector
- this.groupId = groupId
- this.consumerId = genConsumerId(this.groupId)
- this.allBrokers = allBrokers
- this.owners = {}
- }
-
- Consumer.prototype.consume = function (topic, partitionNamesWithOffsets) {
- logger.assert(Array.isArray(partitionNamesWithOffsets))
- logger.info(
- 'consuming', topic.name,
- 'partitions', partitionNamesWithOffsets.length,
- 'group', this.groupId,
- 'consumer', this.consumerId
- )
- var name = topic.name
- var owner = this.owners[name] || new Owner(topic, this.allBrokers)
- this.owners[name] = owner
- owner.consume(partitionNamesWithOffsets)
- }
-
- Consumer.prototype.stop = function (topic, partitionNames) {
- if (!topic) { // stop all
- var topics = Object.keys(this.owners)
- for (var i = 0; i < topics.length; i++) {
- this.stop(topics[i])
- }
- }
- else {
- var name = topic.name
- var owner = this.owners[name]
- owner.stop(partitionNames)
- if (!owner.hasPartitions()) {
- delete this.owners[name]
- }
- }
- }
-
- Consumer.prototype.drain = function (cb) {
- var owners = Object.keys(this.owners)
- for (var i = 0; i < owners.length; i++) {
- this.owners[owners[i]].pause()
- }
- async.forEach(
- this.allBrokers.all(),
- function (broker, next) {
- broker.drain(next)
- },
- cb
- )
- }
-
- Consumer.prototype.pause = function (topic) {
- var name = topic.name
- var owner = this.owners[name]
- if (owner) {
- owner.pause()
- }
- }
-
- Consumer.prototype.resume = function (topic) {
- var name = topic.name
- var owner = this.owners[name]
- if (!owner) {
- this.connector.consume(topic, topic.consumePartitions)
- }
- else {
- owner.resume()
- }
- }
-
- Consumer.prototype.saveOffsets = function (topic) {
- var name = topic.name
- var owner = this.owners[name]
- if (owner) {
- owner.saveOffsets(this.connector)
- }
- }
-
- return Consumer
-}
View
27 example.js
@@ -4,7 +4,7 @@ var fs = require('fs')
var file = fs.createWriteStream('./test.txt')
var kafka = new Kafka({
- //zookeeper: 'localhost:2181',
+ zookeeper: 'localhost:2181',
brokers: [{
id: 0,
host: 'localhost',
@@ -18,15 +18,16 @@ var kafka = new Kafka({
logger: console
})
var i = 0
+var ready = true
file.once('open', function () {
kafka.connect(function () {
var baz = kafka.topic('bazzz', {
- partitions: {
- consume: ['0-0'],
- produce: ['0:1']
- }
+ // partitions: {
+ // consume: ['0-0'],
+ // produce: ['0:1']
+ // }
})
baz.pipe(file)
@@ -45,14 +46,16 @@ file.once('open', function () {
this.resume()
})
- setInterval(
- function () {
- baz.write('i is ' + i + '\n')
+ baz.on('drain', function() { ready = true })
+
+ function writeLoop() {
+ if (ready) {
+ ready = baz.write('i is ' + i + '\n')
i++
- //baz.write("the time is: " + Date.now())
- },
- 10
- )
+ }
+ setTimeout(writeLoop, 10)
+ }
+ writeLoop()
}
)
View
16 index.js
@@ -31,28 +31,26 @@ module.exports = function (options) {
var logger = setLogger(options.logger)
var Client = require('./client')(logger)
- var Partition = require('./partition')(logger)
- var Owner = require('./owner')(Partition)
- var Consumer = require('./consumer')(logger, async, os, inherits, EventEmitter, Owner)
var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
- var Producer = require('./producer')(inherits, EventEmitter, BrokerPool)
- var MessageBuffer = require('./message-buffer')()
- var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer)
- var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Producer, Consumer, BrokerPool, Broker)
+ var Partition = require('./partition')(logger, inherits, EventEmitter, Broker)
+ var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter)
+ var MessageBuffer = require('./message-buffer')(inherits, EventEmitter)
+ var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet)
+ var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker)
if (options.zookeeper) {
try {
var ZooKeeper = require('zookeeper')
var ZK = require('./zk')(logger, async, inherits, EventEmitter, ZooKeeper)
- var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Producer, Consumer, BrokerPool, Broker)
+ var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Broker)
}
catch (e) {
logger.error('node-zookeeper could not be loaded')
throw e
}
}
- var Kafka = require('./kafka')(inherits, EventEmitter, Topic, ZKConnector, StaticConnector, Client.compression)
+ var Kafka = require('./kafka')(inherits, EventEmitter, os, BrokerPool, Topic, ZKConnector, StaticConnector, Client.compression)
return new Kafka(options)
}
View
57 kafka.js
@@ -1,6 +1,8 @@
module.exports = function (
inherits,
EventEmitter,
+ os,
+ BrokerPool,
Topic,
ZKConnector,
StaticConnector,
@@ -23,13 +25,25 @@ module.exports = function (
function Kafka(options) {
this.topics = {}
this.options = options || {}
- this.options.groupId = this.options.groupId || 'franz-kafka'
this.connector = null
+ this.groupId = this.options.groupId || 'franz-kafka'
+ this.consumerId = genConsumerId(this.groupId)
+ this.onBrokerAdded = brokerAdded.bind(this)
+ this.onBrokerRemoved = brokerRemoved.bind(this)
+ this.allBrokers = new BrokerPool()
+ this.allBrokers.once('added', this.onBrokerAdded)
+ this.allBrokers.on('removed', this.onBrokerRemoved)
this.topicDefaults = this.defaultOptions(options)
EventEmitter.call(this)
}
inherits(Kafka, EventEmitter)
+ function genConsumerId(groupId) {
+ var rand = Buffer(4)
+ rand.writeUInt32BE(Math.floor(Math.random() * 0xFFFFFFFF), 0)
+ return groupId + '_' + os.hostname() + '-' + Date.now() + '-' + rand.toString('hex')
+ }
+
function setCompression(string) {
var compression
switch (string && string.toLowerCase()) {
@@ -65,29 +79,11 @@ module.exports = function (
// onconnect: function () {}
Kafka.prototype.connect = function (onconnect) {
if (this.options.zookeeper) {
- this.connector = new ZKConnector(this.options)
+ this.connector = new ZKConnector(this, this.allBrokers, this.options)
}
else if (this.options.brokers) {
- this.connector = new StaticConnector(this.options)
+ this.connector = new StaticConnector(this, this.allBrokers, this.options)
}
- this.connector.once(
- 'brokerAdded', // TODO: create a more definitive event in the connectors
- function () {
- this.emit('connect')
- }.bind(this)
- )
- this.connector.on(
- 'brokerReady',
- function (b) {
- var topics = Object.keys(this.topics)
- for (var i = 0; i < topics.length; i++) {
- var name = topics[i]
- if (b.hasTopic(name)) {
- this.topics[name].setReady(true)
- }
- }
- }.bind(this)
- )
if (typeof(onconnect) === 'function') {
this.once('connect', onconnect)
}
@@ -114,13 +110,28 @@ module.exports = function (
var topic = this.topics[name] ||
new Topic(
name,
- this.connector.producer,
- this.connector.consumer,
+ this,
setTopicOptions(options, this.topicDefaults)
)
this.topics[name] = topic
return topic
}
+ Kafka.prototype.register = function (topic) {
+ this.connector.register(topic)
+ }
+
+ Kafka.prototype.broker = function (id) {
+ return this.allBrokers.get(id)
+ }
+
+ function brokerAdded(broker) {
+ this.emit('connect')
+ }
+
+ function brokerRemoved(broker) {
+ broker.destroy()
+ }
+
return Kafka
}
View
83 message-buffer.js
@@ -1,8 +1,43 @@
-module.exports = function () {
+module.exports = function (
+ inherits,
+ EventEmitter) {
- function handleResponse(err) {
+ function MessageBuffer(partitions, batchSize, queueTime) {
+ this.partitions = partitions
+ this.batchSize = batchSize
+ this.queueTime = queueTime
+ this.messages = []
+ this.timer = null
+ this.send = send.bind(this)
+ this.onProduceResponse = produceResponse.bind(this)
+ EventEmitter.call(this)
+ }
+ inherits(MessageBuffer, EventEmitter)
+
+ MessageBuffer.prototype.reset = function () {
+ this.messages = []
+ clearTimeout(this.timer)
+ this.timer = null
+ }
+
+ MessageBuffer.prototype.push = function (message) {
+ this.messages.push(message)
+ return this.flush()
+ }
+
+ MessageBuffer.prototype.flush = function () {
+ if (this.messages.length >= this.batchSize) {
+ return this.send()
+ }
+ if (!this.timer) {
+ this.timer = setTimeout(this.send, this.queueTime)
+ }
+ return true
+ }
+
+ function produceResponse(err) {
if (err) {
- this.topic.error(err)
+ this.emit('error', err)
}
}
@@ -24,52 +59,16 @@ module.exports = function () {
function send() {
var sent = false
- if (this.producer.isReady(this.topic)) {
+ if (this.partitions.isReady()) {
var batches = batchify(this.messages, this.batchSize)
for (var i = 0; i < batches.length; i++) {
- sent = this.producer.write(
- this.topic,
- batches[i],
- this.produceResponder
- )
+ var partition = this.partitions.nextWritable()
+ sent = partition.write(batches[i], this.onProduceResponse)
}
this.reset()
}
return sent
}
- function MessageBuffer(topic, batchSize, queueTime, producer) {
- this.topic = topic
- this.batchSize = batchSize
- this.queueTime = queueTime
- this.producer = producer
- this.messages = []
- this.timer = null
- this.send = send.bind(this)
- this.produceResponder = handleResponse.bind(this)
- }
-
- MessageBuffer.prototype.reset = function () {
- this.messages = []
- clearTimeout(this.timer)
- this.timer = null
- }
-
- MessageBuffer.prototype.push = function(message) {
- if (!this.timer) {
- this.timer = setTimeout(this.send, this.queueTime)
- }
- if (this.messages.push(message) >= this.batchSize) {
- return this.send()
- }
- return true
- }
-
- MessageBuffer.prototype.flush = function () {
- if (this.messages.length > 0) {
- this.send()
- }
- }
-
return MessageBuffer
}
View
83 owner.js
@@ -1,83 +0,0 @@
-module.exports = function (Partition) {
-
- function Owner(topic, brokers) {
- this.topic = topic
- this.brokers = brokers
- this.partitionsByName = {}
- this.partitions = []
- this.paused = true
- }
-
- Owner.prototype.consume = function (partitionNamesWithOffsets) {
- this.paused = false
- for (var i = 0; i < partitionNamesWithOffsets.length; i++) {
- var nameAndOffset = partitionNamesWithOffsets[i].split(':')
- var name = nameAndOffset[0]
- var offset = +(nameAndOffset[1] || 0)
- var brokerPartition = name.split('-')
- if (brokerPartition.length === 2) {
- var brokerId = +brokerPartition[0]
- var partitionNo = +brokerPartition[1]
- var broker = this.brokers.get(brokerId)
- var partition = this.partitionsByName[name] ||
- new Partition(this.topic, broker, partitionNo, offset)
-
- this.partitionsByName[name] = partition
- if(this.partitions.indexOf(partition) === -1) {
- this.partitions.push(partition)
- }
- partition.reset()
- }
- }
- }
-
- Owner.prototype.stop = function (partitionNames) {
- if (!partitionNames) { // stop all
- partitionNames = Object.keys(this.partitionsByName)
- }
- for (var i = 0; i < partitionNames.length; i++) {
- var name = partitionNames[i]
- var p = this.partitionsByName[name]
- if (p) {
- p.pause()
- var x = this.partitions.indexOf(p)
- if (x >= 0) {
- this.partitions.splice(x, 1)
- }
- delete this.partitionsByName[name]
- }
- }
- }
-
- Owner.prototype.hasPartitions = function () {
- return this.partitions.length > 0
- }
-
- function pausePartition(p) { p.pause() }
-
- Owner.prototype.pause = function () {
- if (!this.paused) {
- this.partitions.forEach(pausePartition)
- }
- this.paused = true
- }
-
- function resumePartition(p) { p.resume() }
-
- Owner.prototype.resume = function () {
- if (this.paused) {
- this.partitions.forEach(resumePartition)
- }
- this.paused = false
- }
-
- Owner.prototype.saveOffsets = function (saver) {
- this.partitions.forEach(
- function (p) {
- p.saveOffset(saver)
- }
- )
- }
-
- return Owner
-}
View
139 partition-set.js
@@ -0,0 +1,139 @@
+module.exports = function (
+ logger,
+ inherits,
+ EventEmitter) {
+
+ function PartitionSet() {
+ this.partitionsByName = {}
+ this.partitions = []
+ this.current = 0
+ this.onReadableChanged = readableChanged.bind(this)
+ this.onWritableChanged = writableChanged.bind(this)
+ this.onPartitionReady = partitionReady.bind(this)
+ this.onPartitionDestroy = partitionDestroy.bind(this)
+ this.readables = {}
+ this.writables = {}
+ EventEmitter.call(this)
+ }
+ inherits(PartitionSet, EventEmitter)
+
+ PartitionSet.prototype.get = function (name) {
+ return this.partitionsByName[name]
+ }
+
+ PartitionSet.prototype.add = function (partition) {
+ if (this.partitions.indexOf(partition) < 0) {
+ partition.on('writable', this.onWritableChanged)
+ partition.on('readable', this.onReadableChanged)
+ partition.on('ready', this.onPartitionReady)
+ partition.on('destroy', this.onPartitionDestroy)
+ this.partitionsByName[partition.name] = partition
+ this.partitions.push(partition)
+ logger.info('added partition', partition.name)
+ }
+ }
+
+ PartitionSet.prototype.remove = function (partition) {
+ var i = this.partitions.indexOf(partition)
+ if (i >= 0) {
+ var p = this.partitions[i]
+ var name = p.name
+ p.removeListener('writable', this.onWritableChanged)
+ p.removeListener('readable', this.onReadableChanged)
+ p.removeListener('ready', this.onPartitionReady)
+ p.removeListener('destroy', this.onPartitionDestroy)
+ delete this.partitionsByName[name]
+ delete this.readables[name]
+ delete this.writables[name]
+ this.partitions.splice(i, 1)
+ logger.info('removed partition', name)
+ }
+ }
+
+ PartitionSet.prototype.next = function () {
+ this.current = (this.current + 1) % this.partitions.length
+ return this.partitions[this.current]
+ }
+
+ PartitionSet.prototype.all = function () {
+ return this.partitions
+ }
+
+ PartitionSet.prototype.isReady = function () {
+ return this.partitions.some(
+ function (p) {
+ return p.isReady() && p.isWritable()
+ }
+ )
+ }
+
+ PartitionSet.prototype.nextWritable = function () {
+ var partition = null
+ for (var i = 0; i < this.partitions.length; i++) {
+ partition = this.next()
+ if (partition.isWritable() && partition.isReady()) {
+ return partition
+ }
+ }
+ return partition
+ }
+
+ PartitionSet.prototype.length = function () {
+ return this.partitions.length
+ }
+
+ function readablePartition(p) { return p.isReadable() }
+
+ PartitionSet.prototype.readable = function () {
+ return this.partitions.filter(readablePartition)
+ }
+
+ function pausePartition(p) { p.pause() }
+
+ PartitionSet.prototype.pause = function () {
+ this.readable().forEach(pausePartition)
+ }
+
+ function resumePartition(p) { p.resume() }
+
+ PartitionSet.prototype.resume = function () {
+ this.readable().forEach(resumePartition)
+ }
+
+ function stopPartition(p) { p.stop() }
+
+ PartitionSet.prototype.stop = function () {
+ this.readable().forEach(stopPartition)
+ }
+
+ function readableChanged(partition) {
+ if (partition.isReadable()) {
+ this.readables[partition.name] = partition
+ }
+ else {
+ delete this.readables[partition.name]
+ }
+ }
+
+ function writableChanged(partition) {
+ if (partition.isWritable()) {
+ this.writables[partition.name] = partition
+ if (this.isReady()) {
+ this.emit('ready')
+ }
+ }
+ else {
+ delete this.writables[partition.name]
+ }
+ }
+
+ function partitionReady(partition) {
+ this.emit('ready')
+ }
+
+ function partitionDestroy(partition) {
+ this.remove(partition)
+ }
+
+ return PartitionSet
+}
View
148 partition.js
@@ -1,61 +1,27 @@
-module.exports = function (logger) {
+module.exports = function (logger, inherits, EventEmitter, Broker) {
-
- function exponentialBackoff(attempt, delay) {
- return Math.floor(
- Math.random() * Math.pow(2, attempt) * 10 + delay
- )
- }
-
- function handleResponse(err, length, messages) {
- this.pending = false
- if (err) {
- return this.topic.error(err)
- }
- this.offset += length
- if (this.paused) {
- logger.info(
- 'buffered', messages.length,
- 'topic', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id
- )
- this.bufferedMessages = messages
- }
- else {
- this.topic.parseMessages(this, messages)
- this._setFetchDelay(length === 0)
- this._loop()
- }
- }
-
- function fetch() {
- if (this.broker.isReady()) {
- this.broker.fetch(
- this.topic,
- this,
- this.fetchResponder
- )
- }
- else {
- this._setFetchDelay(true)
- this._loop()
- }
- }
-
- function Partition(topic, broker, id, offset) {
+ function Partition(topic, broker, id) {
this.topic = topic
this.broker = broker
this.id = id
+ this.name = this.broker.id + '-' + this.id
this.fetchDelay = this.topic.minFetchDelay
this.emptyFetches = 0
- this.offset = offset || 0
- this.fetcher = fetch.bind(this)
- this.fetchResponder = handleResponse.bind(this)
+ this.offset = 0
this.paused = true
this.bufferedMessages = null
this.timer = null
+ this.readable = null
+ this.writable = null
+ this.fetcher = fetch.bind(this)
+ this.onFetchResponse = fetchResponse.bind(this)
+ this.onBrokerReady = brokerReady.bind(this)
+ this.onBrokerDestroy = brokerDestroy.bind(this)
+ this.broker.on('ready', this.onBrokerReady)
+ this.broker.on('destroy', this.onBrokerDestroy)
+ EventEmitter.call(this)
}
+ inherits(Partition, EventEmitter)
Partition.prototype._setFetchDelay = function (shouldDelay) {
this.emptyFetches = shouldDelay ? this.emptyFetches + 1 : 0
@@ -81,10 +47,6 @@ module.exports = function (logger) {
}
}
- Partition.prototype.name = function () {
- return this.broker.id + '-' + this.id
- }
-
Partition.prototype.flush = function () {
if (this.bufferedMessages) {
this.topic.parseMessages(this, this.bufferedMessages)
@@ -118,8 +80,86 @@ module.exports = function (logger) {
this.resume()
}
- Partition.prototype.saveOffset = function (saver) {
- saver.saveOffset(this)
+ Partition.prototype.write = function (messages, cb) {
+ return this.broker.write(this, messages, cb)
+ }
+
+ Partition.prototype.isReady = function () {
+ return this.broker.isReady()
+ }
+
+ Partition.prototype.isWritable = function (writable) {
+ if (writable !== undefined && this.writable !== writable) {
+ this.writable = writable
+ this.emit('writable', this)
+ }
+ return this.writable
+ }
+
+ Partition.prototype.isReadable = function (readable) {
+ if (readable !== undefined && this.readable !== readable) {
+ this.readable = readable
+ this.emit('readable', this)
+ }
+ return this.readable
+ }
+
+ Partition.nil = new Partition({ minFetchDelay: 0 }, Broker.nil, -1)
+
+ function exponentialBackoff(attempt, delay) {
+ return Math.floor(
+ Math.random() * Math.pow(2, attempt) * 10 + delay
+ )
+ }
+
+ function fetchResponse(err, length, messages) {
+ this.pending = false
+ if (err) {
+ return this.topic.error(err)
+ }
+ this.offset += length
+ if (this.paused) {
+ logger.info(
+ 'buffered', messages.length,
+ 'topic', this.topic.name,
+ 'broker', this.broker.id,
+ 'partition', this.id
+ )
+ this.bufferedMessages = messages
+ }
+ else {
+ this.topic.parseMessages(this, messages)
+ this._setFetchDelay(length === 0)
+ this._loop()
+ }
+ }
+
+ function fetch() {
+ if (this.isReady()) {
+ this.broker.fetch(
+ this.topic,
+ this,
+ this.onFetchResponse
+ )
+ }
+ else {
+ this._setFetchDelay(true)
+ this._loop()
+ }
+ }
+
+ function brokerReady() {
+ this.emit('ready', this)
+ }
+
+ function brokerDestroy() {
+ this.pause()
+ this.isWritable(false)
+ this.isReadable(false)
+ this.broker.removeListener('ready', this.onBrokerReady)
+ this.broker.removeListener('destroy', this.onBrokerDestroy)
+ this.broker = Broker.nil
+ this.emit('destroy', this)
}
return Partition
View
80 producer.js
@@ -1,80 +0,0 @@
-module.exports = function (
- inherits,
- EventEmitter,
- BrokerPool) {
-
- function Producer(allBrokers) {
- this.allBrokers = allBrokers
- this.topicBrokers = {}
- EventEmitter.call(this)
- }
- inherits(Producer, EventEmitter)
-
- Producer.prototype._remove = function (broker) {
- var names = Object.keys(this.topicBrokers)
- for (var i = 0; i < names.length; i++) {
- var name = names[i]
- this.topicBrokers[name].remove(broker)
- }
- }
-
- Producer.prototype.removeBrokersNotIn = function (ids) {
- var brokers = this.allBrokers.all()
- for (var i = 0; i < brokers.length; i++) {
- var b = brokers[i]
- if (ids.indexOf(b.id) < 0) {
- this._remove(b)
- }
- }
- }
-
- Producer.prototype.addPartitions = function (topicName, partitionNames) {
- if (!Array.isArray(partitionNames)) {
- return
- }
- for (var i = 0; i < partitionNames.length; i++) {
- var name = partitionNames[i]
- var split = name.split(':')
- if (split.length === 2) {
- var brokerId = +split[0]
- var partitionCount = +split[1]
- this.setBrokerTopicPartitionCount(brokerId, topicName, partitionCount)
- }
- }
- }
-
- Producer.prototype.setBrokerTopicPartitionCount = function (id, name, count) {
- var b = this.allBrokers.get(id)
- if (b) {
- var topicBrokers = this.topicBrokers[name] || new BrokerPool(name)
- topicBrokers.add(b)
- this.topicBrokers[name] = topicBrokers
- b.setTopicPartitions(name, count)
- }
- }
-
- Producer.prototype.brokerForTopic = function (name) {
- return (this.topicBrokers[name] || BrokerPool.nil).nextReady()
- }
-
- Producer.prototype.write = function (topic, messages, cb) {
- var broker = this.brokerForTopic(topic.name)
- if (broker) {
- var ready = broker.write(topic, messages, cb) || this.isReady(topic)
- topic.setReady(ready)
- return ready
- }
- // new topic
- // XXX im not sure how to best handle this case.
- // for instance if you blast a bunch of writes
- // before the broker-partition assignments arrive
- this.allBrokers.randomReady().write(topic, messages, cb)
- return true
- }
-
- Producer.prototype.isReady = function (topic) {
- return this.topicBrokers[topic.name].areAnyReady()
- }
-
- return Producer
-}
View
54 static-connector.js
@@ -2,9 +2,6 @@ module.exports = function (
logger,
inherits,
EventEmitter,
- Producer,
- Consumer,
- BrokerPool,
Broker
) {
@@ -17,55 +14,26 @@ module.exports = function (
// }
// ]
// }
- function StaticConnector(options) {
- this.options = options
- this.allBrokers = new BrokerPool('all')
- this.producer = new Producer(this.allBrokers)
- this.consumer = new Consumer(this, options.groupId, this.allBrokers)
- this.onBrokerConnect = addBroker.bind(this)
- this.onBrokerReady = brokerReady.bind(this)
+ function StaticConnector(kafka, brokers, options) {
+ this.brokers = brokers
+ this.onBrokerConnect = brokerConnect.bind(this)
- this.allBrokers.once(
- 'brokerAdded',
- function (broker) {
- this.emit('brokerAdded', broker)
- }.bind(this)
- )
-
- for (var i = 0; i < this.options.brokers.length; i++) {
- var b = this.options.brokers[i]
- var broker = new Broker(b.id, b.host, b.port, this.options)
+ for (var i = 0; i < options.brokers.length; i++) {
+ var b = options.brokers[i]
+ var broker = new Broker(b.id, { host: b.host, port: b.port })
broker.once('connect', this.onBrokerConnect)
- broker.on('ready', this.onBrokerReady)
+ broker.connect()
}
EventEmitter.call(this)
}
inherits(StaticConnector, EventEmitter)
- function addBroker(broker) {
- this.allBrokers.add(broker)
- }
-
- function brokerReady(broker) {
- this.emit('brokerReady', broker)
- }
-
- StaticConnector.prototype.consume = function (topic, partitions) {
- logger.assert(partitions)
- this.consumer.consume(topic, partitions)
- }
-
- StaticConnector.prototype.stopConsuming = function (topic, partitions) {
- this.consumer.stop(topic, partitions)
+ function brokerConnect(broker) {
+ this.brokers.add(broker)
}
- StaticConnector.prototype.saveOffset = function (partition) {
- logger.info(
- 'saving', partition.id,
- 'broker', partition.broker.id,
- 'offset', partition.offset
- )
- //TODO actually save
+ StaticConnector.prototype.register = function (topic) {
+ // noop
}
return StaticConnector
View
140 topic.js
@@ -2,7 +2,9 @@ module.exports = function (
logger,
inherits,
Stream,
- MessageBuffer) {
+ MessageBuffer,
+ Partition,
+ PartitionSet) {
// A Topic is Readable/Writable Stream.
// It's the main interaction point of the API.
@@ -11,8 +13,7 @@ module.exports = function (
// API API API
//
// name: string
- // producer: Producer
- // consumer: Consumer
+ // kafka: Kafka
// options: {
// minFetchDelay: number (ms)
// maxFetchDelay: number (ms)
@@ -25,38 +26,51 @@ module.exports = function (
// produce: [string] (broker:partitionCount) ex. '0:5'
// }
// }
- function Topic(name, producer, consumer, options) {
+ function Topic(name, kafka, options) {
this.name = name || ''
+ this.kafka = kafka
+ this.encoding = null
+ this.readable = true // required Stream property
+ this.writable = true // required Stream property
+ this.compression = options.compression
this.minFetchDelay = options.minFetchDelay
this.maxFetchDelay = options.maxFetchDelay
this.maxFetchSize = options.maxFetchSize
this.maxMessageSize = options.maxMessageSize
- this.producer = producer
- this.consumer = consumer
- if (options.partitions) {
- this.producer.addPartitions(name, options.partitions.produce)
- this.consumePartitions = options.partitions.consume
- }
- this.ready = true
- this.compression = options.compression
- this.readable = true
- this.writable = true
- this.encoding = null
- this.outgoingMessages = new MessageBuffer(
- this,
- options.batchSize,
- options.queueTime,
- this.producer
- )
this.bufferedMessages = []
this.emitMessages = emitMessages.bind(this)
+
+ this.partitions = new PartitionSet()
+ this.onPartitionsReady = partitionsReady.bind(this)
+ this.partitions.on('ready', this.onPartitionsReady)
+
+ this.produceBuffer = new MessageBuffer(
+ this.partitions,
+ options.batchSize,
+ options.queueTime
+ )
+ this.onError = this.error.bind(this)
+ this.produceBuffer.on('error', this.onError)
+
+ if (options.partitions) {
+ this.addWritablePartitions(options.partitions.produce)
+ this.addReadablePartitions(options.partitions.consume)
+ }
+
Stream.call(this)
}
inherits(Topic, Stream)
//emit end
//emit close
+ function partitionsReady() {
+ if(this.produceBuffer.flush()) {
+ logger.info('drain', this.name)
+ this.emit('drain')
+ }
+ }
+
function emitMessages(payloads) {
for (var i = 0; i < payloads.length; i++) {
var data = payloads[i]
@@ -76,8 +90,8 @@ module.exports = function (
}
}
- Topic.prototype.parseMessages = function(partition, messages) {
- this.emit('offset', partition.name(), partition.offset)
+ Topic.prototype.parseMessages = function (partition, messages) {
+ this.emit('offset', partition.name, partition.offset)
for (var i = 0; i < messages.length; i++) {
messages[i].unpack(this.emitMessages)
}
@@ -96,36 +110,91 @@ module.exports = function (
return this.paused || this.bufferedMessages.length > 0
}
- Topic.prototype.saveOffsets = function () {
- this.consumer.saveOffsets(this)
+ // Partitions
+
+ Topic.prototype.partition = function (name) {
+ var partition = this.partitions.get(name)
+ if (!partition) {
+ var brokerPartition = name.split('-')
+ var brokerId = +(brokerPartition[0])
+ var partitionId = +(brokerPartition[1])
+ var broker = this.kafka.broker(brokerId)
+ if (broker) {
+ partition = new Partition(this, broker, partitionId)
+ this.partitions.add(partition)
+ }
+ }
+ return partition
}
- // Readable Stream
+ Topic.prototype.addWritablePartitions = function (partitionInfo) {
+ if (!Array.isArray(partitionInfo)) {
+ return
+ }
+ for (var i = 0; i < partitionInfo.length; i++) {
+ var info = partitionInfo[i]
+ var brokerPartitionCount = info.split(':')
+ if (brokerPartitionCount.length === 2) {
+ var brokerId = +brokerPartitionCount[0]
+ var partitionCount = +brokerPartitionCount[1]
+ for (var j = 0; j < partitionCount; j++) {
+ var p = this.partition(brokerId + '-' + j)
+ if (p) {
+ p.isWritable(true)
+ }
+ }
+ }
+ }
+ }
+
+ Topic.prototype.addReadablePartitions = function (partitionInfo) {
+ if (!Array.isArray(partitionInfo)) {
+ return
+ }
+ for (var i = 0; i < partitionInfo.length; i++) {
+ var info = partitionInfo[i]
+ var nameOffset = info.split(':')
+ var name = nameOffset[0]
+
+ var partition = this.partition(name)
+ if (partition) {
+ partition.isReadable(true)
+ if (nameOffset.length === 2) {
+ var offset = +nameOffset[1]
+ partition.offset = offset
+ }
+ }
+ }
+ }
Topic.prototype.error = function (err) {
if (!this.paused) {
this.pause()
}
logger.info('topic', this.name, 'error', err.message)
this.emit('error', err)
+ return false
}
+ // Readable Stream
+
Topic.prototype.pause = function () {
logger.info('pause', this.name)
this.paused = true
- this.consumer.pause(this)
+ this.partitions.pause()
}
Topic.prototype.resume = function () {
logger.info('resume', this.name)
+ this.kafka.register(this)
this.paused = this._flushBufferedMessages()
if (!this.paused) {
- this.consumer.resume(this)
+ this.partitions.resume()
}
}
Topic.prototype.destroy = function () {
- this.consumer.stop(this)
+ this.partitions.stop()
}
Topic.prototype.setEncoding = function (encoding) {
@@ -134,20 +203,15 @@ module.exports = function (
//Writable Stream
- Topic.prototype.setReady = function (ready) {
- if(ready && !this.ready) {
- this.outgoingMessages.flush()
- this.emit('drain')
- }
- this.ready = ready
- }
-
Topic.prototype.write = function (data, encoding) {
if(!Buffer.isBuffer(data)) {
encoding = encoding || 'utf8'
data = new Buffer(data, encoding)
}
- return this.outgoingMessages.push(data)
+ if (data.length > this.maxMessageSize) {
+ return this.error(new Error("message too big"))
+ }
+ return this.produceBuffer.push(data)
}
Topic.prototype.end = function (data, encoding) {
View
93 zkconnector.js
@@ -4,9 +4,6 @@ module.exports = function (
inherits,
EventEmitter,
ZK,
- Producer,
- Consumer,
- BrokerPool,
Broker
) {
@@ -23,31 +20,15 @@ module.exports = function (
// zookeeper:
// groupId:
// }
- function ZKConnector(options) {
- var self = this
+ function ZKConnector(kafka, brokers, options) {
+ this.kafka = kafka
+ this.brokers = brokers
this.options = options
this.zk = new ZK(options)
- this.allBrokers = new BrokerPool('all')
- this.producer = new Producer(this.allBrokers)
- this.allBrokers.on(
- 'brokerAdded',
- function (b) {
- self.emit('brokerAdded', b)
- }
- )
- this.allBrokers.on(
- 'brokerRemoved',
- function (b) {
- self.emit('brokerRemoved', b)
- }
- )
- this.brokerReady = function () {
- self.emit('brokerReady', this)
- }
this.hasPendingTopics = false
this.interestedTopics = {}
this.registerTopics = registerTopics.bind(this)
- this.consumer = new Consumer(this, options.groupId, this.allBrokers)
+ this.onBrokerConnect = brokerConnect.bind(this)
this.connect()
EventEmitter.call(this)
}
@@ -61,7 +42,7 @@ module.exports = function (
self.zk.subscribeToTopics()
})
this.zk.on('brokers', this._brokersChanged.bind(this))
- this.zk.on('broker-topic-partition', this._setBrokerTopicPartitionCount.bind(this))
+ this.zk.on('broker-topic-partition', this._setPartitionCount.bind(this))
this.zk.on('consumers-changed', this._rebalance.bind(this))
}
@@ -70,33 +51,49 @@ module.exports = function (
async.forEachSeries(
brokerIds,
function (id, next) {
- if (!self.allBrokers.contains(id)) {
+ if (!self.brokers.get(id)) {
self.zk.getBroker(id, self._createBroker.bind(self))
}
},
function (err) {
- self.producer.removeBrokersNotIn(brokerIds)
+ self._removeBrokersNotIn(brokerIds)
}
)
}
ZKConnector.prototype._createBroker = function (id, info) {
- var self = this
- var split = info.split(':')
- if (split.length > 2) {
- var broker = new Broker(id, split[1], split[2], this.options)
- broker.on('ready', this.brokerReady)
- broker.once(
- 'connect',
- function () {
- self.allBrokers.add(broker)
+ var hostPort = info.split(':')
+ if (hostPort.length > 2) {
+ var host = hostPort[1]
+ var port = hostPort[2]
+ var oldBroker = this.brokers.get(id)
+ if (oldBroker) {
+ if (oldBroker.host === host && oldBroker.port === port) {
+ return
}
- )
+ else {
+ this.brokers.remove(oldBroker)
+ }
+ }
+ var broker = new Broker(id, { host: host, port: port })
+ broker.once('connect', this.onBrokerConnect)
+ broker.connect()
}
}
- ZKConnector.prototype._setBrokerTopicPartitionCount = function (broker, topic, count) {
- this.producer.setBrokerTopicPartitionCount(broker, topic, count)
+ ZKConnector.prototype._setPartitionCount = function (brokerId, topicName, count) {
+ var topic = this.kafka.topic(topicName)
+ topic.addWritablePartitions([brokerId + ':' + count])
+ }
+
+ ZKConnector.prototype._removeBrokersNotIn = function (brokerIds) {
+ var brokers = this.brokers.all()
+ for (var i = 0; i < brokers.length; i++) {
+ var broker = brokers[i]
+ if (brokerIds.indexOf(broker.id) === -1) {
+ this.brokers.remove(broker)
+ }
+ }
}
ZKConnector.prototype._rebalance = function () {
@@ -133,19 +130,17 @@ module.exports = function (
}
}
- ZKConnector.prototype.consume = function (topic) {
- this.hasPendingTopics = true
- this.interestedTopics[topic.name] = topic
- process.nextTick(this.registerTopics)
+ ZKConnector.prototype.register = function (topic) {
+ return false // TODO enable when rebalance works
+ if (!this.interestedTopics[topic.name]) {
+ this.hasPendingTopics = true
+ this.interestedTopics[topic.name] = topic
+ process.nextTick(this.registerTopics)
+ }
}
- ZKConnector.prototype.saveOffset = function (partition) {
- logger.info(
- 'saving', partition.id,
- 'broker', partition.broker.id,
- 'offset', partition.offset
- )
- //TODO implement
+ function brokerConnect(broker) {
+ this.brokers.add(broker)
}
return ZKConnector

0 comments on commit e8bd7d1

Please sign in to comment.