Skip to content
This repository has been archived by the owner on Jan 26, 2018. It is now read-only.

Commit

Permalink
static consumer is working
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Nov 2, 2012
1 parent b7201a7 commit d47a247
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 42 deletions.
4 changes: 2 additions & 2 deletions broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ module.exports = function (
this.topicPartitions = {}
}

Broker.prototype.fetch = function (topic, partition, maxSize) {
this.client.fetch(topic, partition, maxSize)
Broker.prototype.fetch = function (topic, partition, maxSize, cb) {
this.client.fetch(topic, partition, maxSize, cb)
}

Broker.prototype.publish = function (topic, messages) {
Expand Down
12 changes: 9 additions & 3 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ module.exports = function (
return this.ready
}

Client.prototype.fetch = function (topic, partition, maxSize) {
// cb: function (err, length, messages) {}
Client.prototype.fetch = function (topic, partition, maxSize, cb) {
return this._send(
new FetchRequest(topic.name, topic.offset, partition, maxSize),
topic.parseMessages.bind(topic)
new FetchRequest(
topic.name,
topic.offset,
partition,
maxSize
),
cb
)
}

Expand Down
24 changes: 19 additions & 5 deletions consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,25 @@ module.exports = function (

Consumer.prototype.consume = function (topic, interval, partitions) {
console.assert(Array.isArray(partitions))
var owner = this.owners[topic.name] || new Owner(this.allBrokers)
owner.addPartitions(partitions)
owner.interval = interval
this.owners[topic.name] = owner
owner.consume()
var name = topic.name
var owner = this.owners[name] || new Owner(topic, this.allBrokers)
this.owners[name] = owner
owner.consume(partitions, interval)
}

Consumer.prototype.stop = function (topic, partitions) {
if (!topic) { // stop all
var topics = Object.keys(this.owners)
for (var i = 0; i < topics.length; i++) {
this.stop(topics[i])
}
}
var name = topic.name
var owner = this.owners[name]
owner.stop(partitions)
if (!owner.hasPartitions()) {
delete this.owners[name]
}
}

return Consumer
Expand Down
22 changes: 15 additions & 7 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
var Kafka = require('./index')

var kafka = new Kafka({
zookeeper: 'localhost:2181',
//zookeeper: 'localhost:2181',
brokers: [{
id: 0,
host: 'localhost',
port: 9092,
topics: {
foo: 2,
bar: 2
}
}],
compression: 'gzip',
queueTime: 2000,
batchSize: 200
})

kafka.connect(function () {

var foo = kafka.topic('foo')
var bar = kafka.consume('bar', 200)
//var foo = kafka.topic('foo')
var bar = kafka.consume('bar', 200, ['0-0', '0-1'])

bar.on(
'message',
function (m) {
console.log("bar offset: " + bar.offset)
console.log(m.toString())
}
)

setInterval(
function () {
foo.publish("the time is: " + Date.now())
bar.publish("a random number is: " + Math.random())
bar.publish("the time is: " + Date.now())
//foo.publish("a random number is: " + Math.random())
},
5
100
)

}
Expand Down
85 changes: 80 additions & 5 deletions owner.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,92 @@
module.exports = function () {

function Owner(brokers) {
this.brokers = brokers
function handleResponse(err, length, messages) {
if (err) {
return this.topic.error(err)
}
if (length === 0) {
//TODO no new messages, backoff
return
}
this.offset += length
this.topic.parseMessages(messages)
this._loop()
}

function fetch() {
this.broker.fetch(this.topic, this.partition, this.maxSize, this.fetchResponder)
}

function Partition(topic, broker, partition) {
this.topic = topic
this.broker = broker
this.partition = partition
this.interval = 0
this.offset = 0
this.maxSize = 300 * 1024 //TODO set via option
this.fetcher = fetch.bind(this)
this.fetchResponder = handleResponse.bind(this)
this.timer = null
}

Partition.prototype._loop = function () {
if (this.interval) {
this.timer = setTimeout(this.fetcher, this.interval)
}
}

Partition.prototype.start = function () {
this.fetcher()
}

Partition.prototype.stop = function () {
clearTimeout(this.timer)
}

Partition.prototype.reset = function () {
this.stop()
this.start()
}

Owner.prototype.addPartitions = function (partitions) {
function Owner(topic, brokers) {
this.topic = topic
this.brokers = brokers
this.partitions = {}
}

Owner.prototype.consume = function (partitions, interval) {
for (var i = 0; i < partitions.length; i++) {
var name = partitions[i]
var split = name.split('-')
if (split.length === 2) {
var brokerId = +split[0]
var partition = +split[1]
var broker = this.brokers.get(brokerId)
var pc = this.partitions[name] || new Partition(this.topic, broker, partition)
pc.interval = interval
pc.reset()
this.partitions[name] = pc
}
}
}

Owner.prototype.consume = function () {
Owner.prototype.stop = function (partitions) {
if (!partitions) { // stop all
partitions = Object.keys(this.partitions)
}
for (var i = 0; i < partitions.length; i++) {
var name = partitions[i]
var p = this.partitions[name]
if (p) {
p.stop()
delete this.partitions[name]
}
}
}

Owner.prototype.hasPartitions = function () {
return Object.keys(this.partitions).length > 0
}

return Owner
}
}
4 changes: 2 additions & 2 deletions protocol/fetch-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = function (
// REQUEST_HEADER = See REQUEST_HEADER above
// OFFSET = int64 // Offset in topic and partition to start from
// MAX_SIZE = int32 // MAX_SIZE of the message set to return
FetchRequest.prototype.serialize = function (stream) {
FetchRequest.prototype.serialize = function (stream, cb) {
var payload = new Buffer(12)
int53.writeUInt64BE(this.offset, payload)
payload.writeUInt32BE(this.maxSize, 8)
Expand All @@ -41,7 +41,7 @@ module.exports = function (
}

FetchRequest.prototype.response = function (cb) {
return new Response(FetchBody)
return new Response(FetchBody, cb)
}

return FetchRequest
Expand Down
13 changes: 9 additions & 4 deletions static-connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module.exports = function (
// }
function StaticConnector(options) {
var self = this
this.options = options
this.allBrokers = new BrokerPool()
this.producer = new Producer(this.allBrokers)
this.consumer = new Consumer(options.groupId, this.allBrokers)
Expand All @@ -35,7 +36,7 @@ module.exports = function (
this.options.brokers.forEach(
function (b) {
var broker = new Broker(b.id, b.host, b.port)
Object.keys(b.topics).forEach(
Object.keys(b.topics).forEach( //TODO would be great to get rid of this
function (t) {
broker.setTopicPartitions(t, b.topics[t])
}
Expand All @@ -57,9 +58,13 @@ module.exports = function (
}
inherits(StaticConnector, EventEmitter)

StaticConnector.prototype.consume = function (topic, interval, options) {
console.assert(options)
this.consumer.consume(topic, options)
StaticConnector.prototype.consume = function (topic, interval, partitions) {
console.assert(partitions)
this.consumer.consume(topic, interval, partitions)
}

StaticConnector.prototype.stopConsuming = function (topic, partitions) {
this.consumer.stop(topic, partitions)
}

StaticConnector.prototype.publish = function (topic, messages) {
Expand Down
10 changes: 2 additions & 8 deletions topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@ module.exports = function (
MessageBuffer) {

function Topic(name, connector, compression, batchSize, queueTime) {
this.offset = 0
this.name = name || ''
this.partitions = []
this.connector = connector
this.ready = true
this.compression = 0
this.compression = compression
this.messages = new MessageBuffer(this, batchSize, queueTime, connector)
EventEmitter.call(this)
}
inherits(Topic, EventEmitter)

Topic.prototype.parseMessages = function(err, length, messages) {
if (err) {
return this.emit('error', err)
}
Topic.prototype.parseMessages = function(messages) {
var self = this
this.offset += length
for (var i = 0; i < messages.length; i++) {
//XXX do we need to preserve the order?
messages[i].unpack(
Expand Down
23 changes: 17 additions & 6 deletions zkconnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ module.exports = function (
this.brokerReady = function () {
self.emit('brokerReady', this)
}
this.pendingConsumers = []
this.registerConsumers = registerConsumers.bind(this)
this.consumer = new Consumer(options.groupId, this.allBrokers)
this.connect()
EventEmitter.call(this)
Expand Down Expand Up @@ -92,19 +94,28 @@ module.exports = function (

}

ZKConnector.prototype.consume = function (topic, interval) {
var self = this
//TODO: need to be able to consume an array of topics
if (!this.consumerRegistered) {
function registerConsumers() {
if (this.pendingConsumers.length > 0) {
var self = this
this.consumer.foo(this.pendingConsumers) //TODO name me
this.zk.registerConsumers(
this.consumer,
function (partitions) {
self.consumer.consume(topic, interval, partitions)
function (topicPartitions) {
for(var i = 0; i < topicPartitions.length; i++) {
var tp = topicPartitions[i]
self.consumer.consume(tp.topic, tp.interval, tp.partitions)
}
}
)
this.pendingConsumers = []
}
}

ZKConnector.prototype.consume = function (topic, interval) {
this.pendingConsumers.push({topic: topic, interval: interval})
process.nextTick(this.registerConsumers)
}

ZKConnector.prototype.publish = function (topic, messages) {
return this.producer.publish(topic, messages)
}
Expand Down

0 comments on commit d47a247

Please sign in to comment.