diff --git a/broker.js b/broker.js index c62eed4..816791a 100644 --- a/broker.js +++ b/broker.js @@ -70,8 +70,8 @@ module.exports = function ( this.topicPartitions = {} } - Broker.prototype.fetch = function (topic, partition, maxSize) { - this.client.fetch(topic, partition, maxSize) + Broker.prototype.fetch = function (topic, partition, maxSize, cb) { + this.client.fetch(topic, partition, maxSize, cb) } Broker.prototype.publish = function (topic, messages) { diff --git a/client.js b/client.js index a72c643..a58375f 100644 --- a/client.js +++ b/client.js @@ -54,10 +54,16 @@ module.exports = function ( return this.ready } - Client.prototype.fetch = function (topic, partition, maxSize) { + // cb: function (err, length, messages) {} + Client.prototype.fetch = function (topic, partition, maxSize, cb) { return this._send( - new FetchRequest(topic.name, topic.offset, partition, maxSize), - topic.parseMessages.bind(topic) + new FetchRequest( + topic.name, + topic.offset, + partition, + maxSize + ), + cb ) } diff --git a/consumer.js b/consumer.js index 1b56923..19404ee 100644 --- a/consumer.js +++ b/consumer.js @@ -17,11 +17,25 @@ module.exports = function ( Consumer.prototype.consume = function (topic, interval, partitions) { console.assert(Array.isArray(partitions)) - var owner = this.owners[topic.name] || new Owner(this.allBrokers) - owner.addPartitions(partitions) - owner.interval = interval - this.owners[topic.name] = owner - owner.consume() + var name = topic.name + var owner = this.owners[name] || new Owner(topic, this.allBrokers) + this.owners[name] = owner + owner.consume(partitions, interval) + } + + Consumer.prototype.stop = function (topic, partitions) { + if (!topic) { // stop all + var topics = Object.keys(this.owners) + for (var i = 0; i < topics.length; i++) { + this.stop(topics[i]) + } + } + var name = topic.name + var owner = this.owners[name] + owner.stop(partitions) + if (!owner.hasPartitions()) { + delete this.owners[name] + } } return Consumer diff --git a/example.js b/example.js index d4a01be..dbe1664 100644 --- a/example.js +++ b/example.js @@ -1,7 +1,16 @@ var Kafka = require('./index') var kafka = new Kafka({ - zookeeper: 'localhost:2181', + //zookeeper: 'localhost:2181', + brokers: [{ + id: 0, + host: 'localhost', + port: 9092, + topics: { + foo: 2, + bar: 2 + } + }], compression: 'gzip', queueTime: 2000, batchSize: 200 @@ -9,23 +18,22 @@ var kafka = new Kafka({ kafka.connect(function () { - var foo = kafka.topic('foo') - var bar = kafka.consume('bar', 200) + //var foo = kafka.topic('foo') + var bar = kafka.consume('bar', 200, ['0-0', '0-1']) bar.on( 'message', function (m) { - console.log("bar offset: " + bar.offset) console.log(m.toString()) } ) setInterval( function () { - foo.publish("the time is: " + Date.now()) - bar.publish("a random number is: " + Math.random()) + bar.publish("the time is: " + Date.now()) + //foo.publish("a random number is: " + Math.random()) }, - 5 + 100 ) } diff --git a/owner.js b/owner.js index 3510f39..5fb3830 100644 --- a/owner.js +++ b/owner.js @@ -1,17 +1,92 @@ module.exports = function () { - function Owner(brokers) { - this.brokers = brokers + function handleResponse(err, length, messages) { + if (err) { + return this.topic.error(err) + } + if (length === 0) { + //TODO no new messages, backoff + return + } + this.offset += length + this.topic.parseMessages(messages) + this._loop() + } + + function fetch() { + this.broker.fetch(this.topic, this.partition, this.maxSize, this.fetchResponder) + } + + function Partition(topic, broker, partition) { + this.topic = topic + this.broker = broker + this.partition = partition this.interval = 0 + this.offset = 0 + this.maxSize = 300 * 1024 //TODO set via option + this.fetcher = fetch.bind(this) + this.fetchResponder = handleResponse.bind(this) + this.timer = null + } + + Partition.prototype._loop = function () { + if (this.interval) { + this.timer = setTimeout(this.fetcher, this.interval) + } + } + + Partition.prototype.start = function () { + this.fetcher() + } + + Partition.prototype.stop = function () { + clearTimeout(this.timer) + } + + Partition.prototype.reset = function () { + this.stop() + this.start() } - Owner.prototype.addPartitions = function (partitions) { + function Owner(topic, brokers) { + this.topic = topic + this.brokers = brokers + this.partitions = {} + } + Owner.prototype.consume = function (partitions, interval) { + for (var i = 0; i < partitions.length; i++) { + var name = partitions[i] + var split = name.split('-') + if (split.length === 2) { + var brokerId = +split[0] + var partition = +split[1] + var broker = this.brokers.get(brokerId) + var pc = this.partitions[name] || new Partition(this.topic, broker, partition) + pc.interval = interval + pc.reset() + this.partitions[name] = pc + } + } } - Owner.prototype.consume = function () { + Owner.prototype.stop = function (partitions) { + if (!partitions) { // stop all + partitions = Object.keys(this.partitions) + } + for (var i = 0; i < partitions.length; i++) { + var name = partitions[i] + var p = this.partitions[name] + if (p) { + p.stop() + delete this.partitions[name] + } + } + } + Owner.prototype.hasPartitions = function () { + return Object.keys(this.partitions).length > 0 } return Owner -} \ No newline at end of file +} diff --git a/protocol/fetch-request.js b/protocol/fetch-request.js index 69da449..8137a42 100644 --- a/protocol/fetch-request.js +++ b/protocol/fetch-request.js @@ -26,7 +26,7 @@ module.exports = function ( // REQUEST_HEADER = See REQUEST_HEADER above // OFFSET = int64 // Offset in topic and partition to start from // MAX_SIZE = int32 // MAX_SIZE of the message set to return - FetchRequest.prototype.serialize = function (stream) { + FetchRequest.prototype.serialize = function (stream, cb) { var payload = new Buffer(12) int53.writeUInt64BE(this.offset, payload) payload.writeUInt32BE(this.maxSize, 8) @@ -41,7 +41,7 @@ module.exports = function ( } FetchRequest.prototype.response = function (cb) { - return new Response(FetchBody) + return new Response(FetchBody, cb) } return FetchRequest diff --git a/static-connector.js b/static-connector.js index 68fe64a..d6109aa 100644 --- a/static-connector.js +++ b/static-connector.js @@ -22,6 +22,7 @@ module.exports = function ( // } function StaticConnector(options) { var self = this + this.options = options this.allBrokers = new BrokerPool() this.producer = new Producer(this.allBrokers) this.consumer = new Consumer(options.groupId, this.allBrokers) @@ -35,7 +36,7 @@ module.exports = function ( this.options.brokers.forEach( function (b) { var broker = new Broker(b.id, b.host, b.port) - Object.keys(b.topics).forEach( + Object.keys(b.topics).forEach( //TODO would be great to get rid of this function (t) { broker.setTopicPartitions(t, b.topics[t]) } @@ -57,9 +58,13 @@ module.exports = function ( } inherits(StaticConnector, EventEmitter) - StaticConnector.prototype.consume = function (topic, interval, options) { - console.assert(options) - this.consumer.consume(topic, options) + StaticConnector.prototype.consume = function (topic, interval, partitions) { + console.assert(partitions) + this.consumer.consume(topic, interval, partitions) + } + + StaticConnector.prototype.stopConsuming = function (topic, partitions) { + this.consumer.stop(topic, partitions) } StaticConnector.prototype.publish = function (topic, messages) { diff --git a/topic.js b/topic.js index 17b9e37..3c5e6b1 100644 --- a/topic.js +++ b/topic.js @@ -4,23 +4,17 @@ module.exports = function ( MessageBuffer) { function Topic(name, connector, compression, batchSize, queueTime) { - this.offset = 0 this.name = name || '' - this.partitions = [] this.connector = connector this.ready = true - this.compression = 0 + this.compression = compression this.messages = new MessageBuffer(this, batchSize, queueTime, connector) EventEmitter.call(this) } inherits(Topic, EventEmitter) - Topic.prototype.parseMessages = function(err, length, messages) { - if (err) { - return this.emit('error', err) - } + Topic.prototype.parseMessages = function(messages) { var self = this - this.offset += length for (var i = 0; i < messages.length; i++) { //XXX do we need to preserve the order? messages[i].unpack( diff --git a/zkconnector.js b/zkconnector.js index 2c5b430..6aca111 100644 --- a/zkconnector.js +++ b/zkconnector.js @@ -37,6 +37,8 @@ module.exports = function ( this.brokerReady = function () { self.emit('brokerReady', this) } + this.pendingConsumers = [] + this.registerConsumers = registerConsumers.bind(this) this.consumer = new Consumer(options.groupId, this.allBrokers) this.connect() EventEmitter.call(this) @@ -92,19 +94,28 @@ module.exports = function ( } - ZKConnector.prototype.consume = function (topic, interval) { - var self = this - //TODO: need to be able to consume an array of topics - if (!this.consumerRegistered) { + function registerConsumers() { + if (this.pendingConsumers.length > 0) { + var self = this + this.consumer.foo(this.pendingConsumers) //TODO name me this.zk.registerConsumers( this.consumer, - function (partitions) { - self.consumer.consume(topic, interval, partitions) + function (topicPartitions) { + for(var i = 0; i < topicPartitions.length; i++) { + var tp = topicPartitions[i] + self.consumer.consume(tp.topic, tp.interval, tp.partitions) + } } ) + this.pendingConsumers = [] } } + ZKConnector.prototype.consume = function (topic, interval) { + this.pendingConsumers.push({topic: topic, interval: interval}) + process.nextTick(this.registerConsumers) + } + ZKConnector.prototype.publish = function (topic, messages) { return this.producer.publish(topic, messages) }