Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

event handler accounting

  • Loading branch information...
commit 956284f2d07e7b0dc805982020abb953a4481ae3 1 parent 24877e7
@dannycoates authored
View
27 broker-pool.js
@@ -38,33 +38,6 @@ module.exports = function (logger, inherits, EventEmitter) {
return this.brokers[this.current]
}
- BrokerPool.prototype.nextReady = function () {
- for (var i = 0; i < this.brokers.length; i++) {
- var b = this.next()
- if (b.isReady()) {
- break
- }
- }
- return b
- }
-
- BrokerPool.prototype.randomReady = function () {
- var len = this.brokers.length
- var n = (Math.floor(Math.random() * len))
- for (var i = 0; i < len; i++) {
- var b = this.brokers[n]
- if (b.isReady()) {
- break
- }
- n = (n + 1) % len
- }
- return b
- }
-
- BrokerPool.prototype.areAnyReady = function () {
- return this.brokers.some(function (b) { return b.isReady() })
- }
-
BrokerPool.prototype.get = function (id) {
return this.brokersById[id]
}
View
86 broker.js
@@ -11,16 +11,13 @@ module.exports = function (
this.reconnectAttempts = 0
this.options = options
this.connector = this.connect.bind(this)
+ this.onClientEnd = clientEnd.bind(this)
+ this.onClientReady = clientReady.bind(this)
+ this.onClientConnect = clientConnect.bind(this)
EventEmitter.call(this)
}
inherits(Broker, EventEmitter)
- function exponentialBackoff(attempt) {
- return Math.floor(
- Math.random() * Math.pow(2, attempt) * 10
- )
- }
-
Broker.prototype.connect = function () {
var options = this.options
logger.info(
@@ -29,33 +26,12 @@ module.exports = function (
'port', options.port
)
this.client = new Client(this.id, options)
- this.client.once(
- 'connect',
- function () {
- logger.info('broker connected', this.id)
- this.reconnectAttempts = 0
- this.emit('connect', this)
- this.emit('ready')
- }.bind(this)
- )
- this.client.once(
- 'end',
- function () {
- this.reconnectAttempts++
- logger.info('broker ended', this.id, this.reconnectAttempts)
- setTimeout(
- this.connector,
- exponentialBackoff(this.reconnectAttempts)
- )
- }.bind(this)
- )
- this.client.on(
- 'ready',
- function () {
- logger.info('broker ready', this.id)
- this.emit('ready')
- }.bind(this)
- )
+
+ this.client.once('connect', this.onClientConnect)
+ this.client.once('end', this.onClientEnd)
+ this.client.on('ready', this.onClientReady)
+
+ this.reconnectTimer = null
}
Broker.prototype.isReady = function () {
@@ -74,6 +50,50 @@ module.exports = function (
this.client.drain(cb)
}
+ Broker.prototype.destroy = function () {
+ clearTimeout(this.reconnectTimer)
+ this.reconnectTimer = null
+ this.client.removeListener('connect', this.onClientConnect)
+ this.client.removeListener('end', this.onClientEnd)
+ this.client.removeListener('ready', this.onClientReady)
+ this.client.end()
+ this.client = Client.nil
+ this.emit('destroy')
+ }
+
+ function exponentialBackoff(attempt) {
+ return Math.floor(
+ Math.random() * Math.pow(2, attempt) * 10
+ )
+ }
+
+ function clientConnect() {
+ logger.info('broker connected', this.id)
+ this.reconnectAttempts = 0
+ this.emit('connect', this)
+ this.emit('ready')
+ }
+
+ function clientEnd() {
+ this.reconnectAttempts++
+ logger.info(
+ 'broker ended', this.id,
+ 'reconnects', this.reconnectAttempts
+ )
+ this.client.removeListener('connect', this.onClientConnect)
+ this.client.removeListener('end', this.onClientEnd)
+ this.client.removeListener('ready', this.onClientReady)
+ this.reconnectTimer = setTimeout(
+ this.connector,
+ exponentialBackoff(this.reconnectAttempts)
+ )
+ }
+
+ function clientReady() {
+ logger.info('broker ready', this.id)
+ this.emit('ready')
+ }
+
Broker.nil = new Broker()
return Broker
View
98 client/client.js
@@ -11,53 +11,24 @@ module.exports = function (
OffsetsRequest
) {
function Client(id, options) {
- this.connection = net.connect(options)
- this.connection.on(
- 'connect',
- function () {
- logger.info('client connect')
- this.readableSteam = new ReadableStream()
- this.readableSteam.wrap(this.connection)
- this.receiver = new Receiver(this.readableSteam)
- this.ready = true
- this.emit('connect')
- }.bind(this)
- )
- this.connection.on(
- 'end',
- function () {
- logger.info('client end')
- this.ready = false
- this.emit('end')
- this.connection = null
- }.bind(this)
- )
- this.connection.on(
- 'drain',
- function () {
- if (!this.ready) { //TODO: why is connection.drain so frequent?
- this.ready = true
- this.emit('ready')
- }
- }.bind(this)
- )
- this.connection.on(
- 'error',
- function (err) {
- logger.info('client error', err.message)
- }
- )
- this.connection.on(
- 'close',
- function (hadError) {
- logger.info('client closed. with error:', hadError)
- this.emit('end')
- }.bind(this)
- )
this.id = id
this.ready = false
this.readableSteam = null
this.receiver = null
+
+ this.connection = net.connect(options)
+ this.onConnectionConnect = connectionConnect.bind(this)
+ this.onConnectionEnd = connectionEnd.bind(this)
+ this.onConnectionDrain = connectionDrain.bind(this)
+ this.onConnectionError = connectionError.bind(this)
+ this.onConnectionClose = connectionClose.bind(this)
+
+ this.connection.on('connect', this.onConnectionConnect)
+ this.connection.on('end', this.onConnectionEnd)
+ this.connection.on('drain', this.onConnectionDrain)
+ this.connection.on('error', this.onConnectionError)
+ this.connection.on('close', this.onConnectionClose)
+
EventEmitter.call(this)
}
inherits(Client, EventEmitter)
@@ -74,6 +45,10 @@ module.exports = function (
)
}
+ Client.prototype.end = function () {
+ this.connection.end()
+ }
+
Client.prototype._send = function (request, cb) {
request.serialize(
this.connection,
@@ -137,5 +112,42 @@ module.exports = function (
Client.nil = { ready: false }
+ function connectionConnect() {
+ logger.info('client connect')
+ this.readableSteam = new ReadableStream()
+ this.readableSteam.wrap(this.connection)
+ this.receiver = new Receiver(this.readableSteam)
+ this.ready = true
+ this.emit('connect')
+ }
+
+ function connectionEnd() {
+ logger.info('client end')
+ this.ready = false
+ this.emit('end')
+ this.connection = null
+ }
+
+ function connectionDrain() {
+ if (!this.ready) { //TODO: why is connection.drain so frequent?
+ this.ready = true
+ this.emit('ready')
+ }
+ }
+
+ function connectionError() {
+ logger.info('client error', err.message)
+ }
+
+ function connectionClose(hadError) {
+ logger.info('client closed. with error', hadError)
+ this.connection.removeListener('connect', this.onConnectionConnect)
+ this.connection.removeListener('end', this.onConnectionEnd)
+ this.connection.removeListener('drain', this.onConnectionDrain)
+ this.connection.removeListener('error', this.onConnectionError)
+ this.connection.removeListener('close', this.onConnectionClose)
+ this.emit('end')
+ }
+
return Client
}
View
16 client/fetch-body.js
@@ -3,14 +3,6 @@ module.exports = function (
State,
Message) {
- function FetchError(message, messageLength) {
- this.message = message
- this.messageLength = messageLength
- Error.call(this)
- }
- inherits(FetchError, Error)
- FetchError.prototype.name = 'Fetch Error'
-
function FetchBody(bytes) {
this.bytesParsed = 0
this.lastMessageLength = 0
@@ -55,5 +47,13 @@ module.exports = function (
return err
}
+ function FetchError(message, messageLength) {
+ this.message = message
+ this.messageLength = messageLength
+ Error.call(this)
+ }
+ inherits(FetchError, Error)
+ FetchError.prototype.name = 'Fetch Error'
+
return FetchBody
}
View
16 client/produce-request.js
@@ -4,14 +4,6 @@ module.exports = function (
Message,
State) {
- function ProduceError(message, length) {
- this.message = message
- this.length = length
- Error.call(this)
- }
- inherits(ProduceError, Error)
- ProduceError.prototype.name = 'Produce Error'
-
function ProduceRequest(topic, partitionId, messages) {
this.topic = topic
this.partitionId = partitionId
@@ -91,5 +83,13 @@ module.exports = function (
return State.done
}
+ function ProduceError(message, length) {
+ this.message = message
+ this.length = length
+ Error.call(this)
+ }
+ inherits(ProduceError, Error)
+ ProduceError.prototype.name = 'Produce Error'
+
return ProduceRequest
}
View
60 client/receiver.js
@@ -6,36 +6,18 @@ module.exports = function (
function Receiver(stream) {
this.stream = stream
- this.stream.on(
- 'readable',
- function () {
- this.read()
- }.bind(this)
- )
- this.stream.on(
- 'end',
- function () {
- logger.info(
- 'receiver', 'ended',
- 'queue', this.queue.length
- )
- while (this.queue.length > 0) {
- this.current.abort()
- this.next()
- }
- this.current.abort()
- this.closed = true
- }.bind(this)
- )
- this.stream.on(
- 'error',
- function (err) {
- logger.info('receiver error', err.message)
- }
- )
this.queue = []
this.current = State.nil
this.closed = false
+
+ this.onStreamReadable = streamReadable.bind(this)
+ this.onStreamEnd = streamEnd.bind(this)
+ this.onStreamError = streamError.bind(this)
+
+ this.stream.on('readable', this.onStreamReadable)
+ this.stream.on('end', this.onStreamEnd)
+ this.stream.on('error', this.onStreamError)
+
EventEmitter.call(this)
}
inherits(Receiver, EventEmitter)
@@ -86,5 +68,29 @@ module.exports = function (
return true
}
+ function streamReadable() {
+ this.read()
+ }
+
+ function streamEnd() {
+ logger.info(
+ 'receiver', 'ended',
+ 'queue', this.queue.length
+ )
+ while (this.queue.length > 0) {
+ this.current.abort()
+ this.next()
+ }
+ this.current.abort()
+ this.closed = true
+ this.stream.removeListener('readable', this.onStreamReadable)
+ this.stream.removeListener('end', this.onStreamEnd)
+ this.stream.removeListener('error', this.onStreamError)
+ }
+
+ function streamError(err) {
+ logger.info('receiver error', err.message)
+ }
+
return Receiver
}
View
68 message-buffer.js
@@ -2,6 +2,40 @@ module.exports = function (
inherits,
EventEmitter) {
+ function MessageBuffer(partitions, batchSize, queueTime) {
+ this.partitions = partitions
+ this.batchSize = batchSize
+ this.queueTime = queueTime
+ this.messages = []
+ this.timer = null
+ this.send = send.bind(this)
+ this.produceResponder = handleResponse.bind(this)
+ EventEmitter.call(this)
+ }
+ inherits(MessageBuffer, EventEmitter)
+
+ MessageBuffer.prototype.reset = function () {
+ this.messages = []
+ clearTimeout(this.timer)
+ this.timer = null
+ }
+
+ MessageBuffer.prototype.push = function(message) {
+ if (!this.timer) {
+ this.timer = setTimeout(this.send, this.queueTime)
+ }
+ if (this.messages.push(message) >= this.batchSize) {
+ return this.send()
+ }
+ return true
+ }
+
+ MessageBuffer.prototype.flush = function () {
+ if (this.messages.length > 0) {
+ this.send()
+ }
+ }
+
function handleResponse(err) {
if (err) {
this.emit('error', err)
@@ -40,39 +74,5 @@ module.exports = function (
return sent
}
- function MessageBuffer(partitions, batchSize, queueTime) {
- this.partitions = partitions
- this.batchSize = batchSize
- this.queueTime = queueTime
- this.messages = []
- this.timer = null
- this.send = send.bind(this)
- this.produceResponder = handleResponse.bind(this)
- EventEmitter.call(this)
- }
- inherits(MessageBuffer, EventEmitter)
-
- MessageBuffer.prototype.reset = function () {
- this.messages = []
- clearTimeout(this.timer)
- this.timer = null
- }
-
- MessageBuffer.prototype.push = function(message) {
- if (!this.timer) {
- this.timer = setTimeout(this.send, this.queueTime)
- }
- if (this.messages.push(message) >= this.batchSize) {
- return this.send()
- }
- return true
- }
-
- MessageBuffer.prototype.flush = function () {
- if (this.messages.length > 0) {
- this.send()
- }
- }
-
return MessageBuffer
}
View
56 partition-set.js
@@ -10,34 +10,13 @@ module.exports = function (
this.onReadableChanged = readableChanged.bind(this)
this.onWritableChanged = writableChanged.bind(this)
this.onPartitionReady = partitionReady.bind(this)
+ this.onPartitionDestroy = partitionDestroy.bind(this)
this.readables = {}
this.writables = {}
EventEmitter.call(this)
}
inherits(PartitionSet, EventEmitter)
- function readableChanged(partition) {
- if (partition.isReadable()) {
- this.readables[partition.name()] = partition
- }
- else {
- delete this.readables[partition.name()]
- }
- }
-
- function writableChanged(partition) {
- if (partition.isWritable()) {
- this.writables[partition.name()] = partition
- }
- else {
- delete this.writables[partition.name()]
- }
- }
-
- function partitionReady(partition) {
- this.emit('ready')
- }
-
PartitionSet.prototype.get = function (name) {
return this.partitionsByName[name]
}
@@ -47,6 +26,7 @@ module.exports = function (
partition.on('writable', this.onWritableChanged)
partition.on('readable', this.onReadableChanged)
partition.on('ready', this.onPartitionReady)
+ partition.on('destroy', this.onPartitionDestroy)
this.partitionsByName[partition.name()] = partition
this.partitions.push(partition)
}
@@ -56,10 +36,14 @@ module.exports = function (
var i = this.partitions.indexOf(partition)
if (i >= 0) {
var p = this.partitions[i]
+ var name = p.name()
p.removeListener('writable', this.onWritableChanged)
p.removeListener('readable', this.onReadableChanged)
p.removeListener('ready', this.onPartitionReady)
- delete this.partitionsByName[p.name()]
+ p.removeListener('destroy', this.onPartitionDestroy)
+ delete this.partitionsByName[name]
+ delete this.readables[name]
+ delete this.writables[name]
this.partitions.splice(i, 1)
}
}
@@ -122,5 +106,31 @@ module.exports = function (
PartitionSet.nil = new PartitionSet()
+ function readableChanged(partition) {
+ if (partition.isReadable()) {
+ this.readables[partition.name()] = partition
+ }
+ else {
+ delete this.readables[partition.name()]
+ }
+ }
+
+ function writableChanged(partition) {
+ if (partition.isWritable()) {
+ this.writables[partition.name()] = partition
+ }
+ else {
+ delete this.writables[partition.name()]
+ }
+ }
+
+ function partitionReady(partition) {
+ this.emit('ready')
+ }
+
+ function partitionDestroy(partition) {
+ this.remove(partition)
+ }
+
return PartitionSet
}
View
118 partition.js
@@ -1,52 +1,5 @@
module.exports = function (logger, inherits, EventEmitter, Broker) {
-
- function exponentialBackoff(attempt, delay) {
- return Math.floor(
- Math.random() * Math.pow(2, attempt) * 10 + delay
- )
- }
-
- function handleResponse(err, length, messages) {
- this.pending = false
- if (err) {
- return this.topic.error(err)
- }
- this.offset += length
- if (this.paused) {
- logger.info(
- 'buffered', messages.length,
- 'topic', this.topic.name,
- 'broker', this.broker.id,
- 'partition', this.id
- )
- this.bufferedMessages = messages
- }
- else {
- this.topic.parseMessages(this, messages)
- this._setFetchDelay(length === 0)
- this._loop()
- }
- }
-
- function fetch() {
- if (this.isReady()) {
- this.broker.fetch(
- this.topic,
- this,
- this.fetchResponder
- )
- }
- else {
- this._setFetchDelay(true)
- this._loop()
- }
- }
-
- function brokerReady() {
- this.emit('ready', this)
- }
-
function Partition(topic, broker, id, offset) {
this.topic = topic
this.broker = broker
@@ -54,20 +7,17 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.fetchDelay = this.topic.minFetchDelay
this.emptyFetches = 0
this.offset = offset || 0
- this.fetcher = fetch.bind(this)
- this.fetchResponder = handleResponse.bind(this)
this.paused = true
this.bufferedMessages = null
this.timer = null
- this.broker.on('ready', brokerReady.bind(this))
- // TODO: readable and writable determine whether this partition
- // can be used for consuming or producing.
- // I think writable might always be true, but readable is determined
- // by the "connector" (as it exists now).
- // I'm trying to factor out the connector in favor of a "controller"
- // that does partition and broker management
this.readable = null
this.writable = null
+ this.fetcher = fetch.bind(this)
+ this.onFetchResponse = fetchResponse.bind(this)
+ this.onBrokerReady = brokerReady.bind(this)
+ this.onBrokerDestroy = brokerDestroy.bind(this)
+ this.broker.on('ready', this.onBrokerReady)
+ this.broker.on('destroy', this.onBrokerDestroy)
EventEmitter.call(this)
}
inherits(Partition, EventEmitter)
@@ -163,5 +113,61 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
Partition.nil = new Partition({ minFetchDelay: 0 }, Broker.nil, -1)
+ function exponentialBackoff(attempt, delay) {
+ return Math.floor(
+ Math.random() * Math.pow(2, attempt) * 10 + delay
+ )
+ }
+
+ function fetchResponse(err, length, messages) {
+ this.pending = false
+ if (err) {
+ return this.topic.error(err)
+ }
+ this.offset += length
+ if (this.paused) {
+ logger.info(
+ 'buffered', messages.length,
+ 'topic', this.topic.name,
+ 'broker', this.broker.id,
+ 'partition', this.id
+ )
+ this.bufferedMessages = messages
+ }
+ else {
+ this.topic.parseMessages(this, messages)
+ this._setFetchDelay(length === 0)
+ this._loop()
+ }
+ }
+
+ function fetch() {
+ if (this.isReady()) {
+ this.broker.fetch(
+ this.topic,
+ this,
+ this.onFetchResponse
+ )
+ }
+ else {
+ this._setFetchDelay(true)
+ this._loop()
+ }
+ }
+
+ function brokerReady() {
+ this.emit('ready', this)
+ }
+
+ function brokerDestroy() {
+ this.pause()
+ this.isWritable(false)
+ this.isReadable(false)
+ this.broker.removeListener('ready', this.onBrokerReady)
+ this.broker.removeListener('destroy', this.onBrokerDestroy)
+ this.broker = Broker.nil
+ this.emit('destroy', this)
+ }
+
return Partition
}
View
24 topic.js
@@ -28,29 +28,35 @@ module.exports = function (
// }
function Topic(name, kafka, options) {
this.name = name || ''
+ this.kafka = kafka
+ this.ready = true
+ this.encoding = null
+ this.readable = true // required Stream property
+ this.writable = true // required Stream property
+ this.compression = options.compression
this.minFetchDelay = options.minFetchDelay
this.maxFetchDelay = options.maxFetchDelay
this.maxFetchSize = options.maxFetchSize
this.maxMessageSize = options.maxMessageSize
- this.kafka = kafka
+
this.partitions = new PartitionSet()
- this.partitions.on('ready', partitionsReady.bind(this))
+ this.onPartitionsReady = partitionsReady.bind(this)
+ this.partitions.on('ready', this.onPartitionsReady)
if (options.partitions) {
this.addWritablePartitions(options.partitions.produce)
this.makePartitionsReadable(options.partitions.consume)
}
- this.ready = true
- this.compression = options.compression
- this.readable = true
- this.writable = true
- this.encoding = null
+
this.produceBuffer = new MessageBuffer(
this.partitions,
options.batchSize,
options.queueTime
)
- this.produceBuffer.on('error', this.error.bind(this))
- this.produceBuffer.on('full', produceBufferFull.bind(this))
+ this.onError = this.error.bind(this)
+ this.onProduceBufferFull = produceBufferFull.bind(this)
+ this.produceBuffer.on('error', this.onError)
+ this.produceBuffer.on('full', this.onProduceBufferFull)
+
this.bufferedMessages = []
this.emitMessages = emitMessages.bind(this)
Stream.call(this)
Please sign in to comment.
Something went wrong with that request. Please try again.