Skip to content
This repository has been archived by the owner on Jan 26, 2018. It is now read-only.

Commit

Permalink
consume work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Nov 2, 2012
1 parent 7a1606a commit b7201a7
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 44 deletions.
4 changes: 2 additions & 2 deletions broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ module.exports = function (
this.topicPartitions = {}
}

Broker.prototype.fetch = function () {

Broker.prototype.fetch = function (topic, partition, maxSize) {
this.client.fetch(topic, partition, maxSize)
}

Broker.prototype.publish = function (topic, messages) {
Expand Down
4 changes: 2 additions & 2 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ module.exports = function (
return this.ready
}

Client.prototype.fetch = function (topic, maxSize) {
Client.prototype.fetch = function (topic, partition, maxSize) {
return this._send(
new FetchRequest(topic.name, topic.offset, topic.partition, maxSize),
new FetchRequest(topic.name, topic.offset, partition, maxSize),
topic.parseMessages.bind(topic)
)
}
Expand Down
21 changes: 13 additions & 8 deletions consumer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
module.exports = function (os, inherits, EventEmitter) {
module.exports = function (
os,
inherits,
EventEmitter,
Owner) {

function genConsumerId(groupId) {
return groupId + '_' + os.hostname() + '-' + Date.now() + '-' + "DEADBEEF"
Expand All @@ -8,15 +12,16 @@ module.exports = function (os, inherits, EventEmitter) {
this.groupId = groupId
this.consumerId = genConsumerId(this.groupId)
this.allBrokers = allBrokers
this.topics = {}
this.owners = {}
}

Consumer.prototype.consume = function (topic) {

}

Consumer.prototype.fetch = function (topic) {

Consumer.prototype.consume = function (topic, interval, partitions) {
console.assert(Array.isArray(partitions))
var owner = this.owners[topic.name] || new Owner(this.allBrokers)
owner.addPartitions(partitions)
owner.interval = interval
this.owners[topic.name] = owner
owner.consume()
}

return Consumer
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ var Client = require('./client')(
protocol.ProduceRequest,
protocol.OffsetsRequest
)
var Owner = require('./owner')()
var Broker = require('./broker')(inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(inherits, EventEmitter)
var Producer = require('./producer')(inherits, EventEmitter, BrokerPool)
var Consumer = require('./consumer')(os, inherits, EventEmitter)
var Consumer = require('./consumer')(os, inherits, EventEmitter, Owner)
var ZK = require('./zk')(async, inherits, EventEmitter, ZooKeeper)
var ZKConnector = require('./zkconnector')(async, inherits, EventEmitter, ZK, Producer, Consumer, BrokerPool, Broker)
var StaticConnector = require('./static-connector')(inherits, EventEmitter, Producer, Consumer, BrokerPool, Broker)
Expand Down
4 changes: 2 additions & 2 deletions kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ module.exports = function (
return topic
}

Kafka.prototype.consume = function (name, interval) {
Kafka.prototype.consume = function (name, interval, partitions) {
var topic = this.topic(name)
topic.consume(interval || 1000)
topic.consume(interval || 1000, partitions)
return topic
}

Expand Down
17 changes: 17 additions & 0 deletions owner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module.exports = function () {

function Owner(brokers) {
this.brokers = brokers
this.interval = 0
}

Owner.prototype.addPartitions = function (partitions) {

}

Owner.prototype.consume = function () {

}

return Owner
}
24 changes: 3 additions & 21 deletions static-connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ module.exports = function (
// bar: 2
// }
// }
// ],
// consumers: [
// {
//
// }
// ]
// }
function StaticConnector(options) {
Expand Down Expand Up @@ -58,31 +53,18 @@ module.exports = function (
)
}
)
this.options.consumers.forEach(
function (c) {

}
)
EventEmitter.call(this)
}
inherits(StaticConnector, EventEmitter)

StaticConnector.prototype.consume = function (topic, interval) {
this.consumer.consume(
topic,
function () {

}
)
StaticConnector.prototype.consume = function (topic, interval, options) {
console.assert(options)
this.consumer.consume(topic, options)
}

StaticConnector.prototype.publish = function (topic, messages) {
return this.producer.publish(topic, messages)
}

StaticConnector.prototype.fetch = function (topic) {
return this.consumer.fetch(topic)
}

return StaticConnector
}
4 changes: 2 additions & 2 deletions topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ module.exports = function (
)
}

Topic.prototype.consume = function (interval) { //TODO: starting offset?
this.connector.consume(this, interval)
Topic.prototype.consume = function (interval, partitions) { //TODO: starting offset?
this.connector.consume(this, interval, partitions)
}

return Topic
Expand Down
15 changes: 9 additions & 6 deletions zkconnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ module.exports = function (

ZKConnector.prototype.consume = function (topic, interval) {
var self = this
//this.consumer.consume(topic)
this.zk.registerConsumers(this.groupId, this.consumerId, this.consumer, noop)
}

ZKConnector.prototype.fetch = function (topic) {

//TODO: need to be able to consume an array of topics
if (!this.consumerRegistered) {
this.zk.registerConsumers(
this.consumer,
function (partitions) {
self.consumer.consume(topic, interval, partitions)
}
)
}
}

ZKConnector.prototype.publish = function (topic, messages) {
Expand Down

0 comments on commit b7201a7

Please sign in to comment.