Skip to content
This repository has been archived by the owner on Jan 26, 2018. It is now read-only.

Commit

Permalink
added a logger and moved consume interval to topic
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Nov 4, 2012
1 parent 51597f8 commit 90c4340
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 42 deletions.
1 change: 0 additions & 1 deletion client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ module.exports = function (
// messages: array of: string, Buffer, Message
// partition: number
Client.prototype.publish = function (topic, messages, partition) {
//console.log('publish ' + topic.name + partition)
return this._send(
new ProduceRequest(
topic.name,
Expand Down
1 change: 0 additions & 1 deletion client/fetch-body.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ module.exports = function (
// / MESSAGES (0 or more) /
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
FetchBody.prototype.parse = function () {
console.assert(this.complete())
var messages = []
var offset = 0
while (offset < this.buffer.length) {
Expand Down
1 change: 0 additions & 1 deletion client/offsets-body.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ module.exports = function (
// NUMBER_OFFSETS = int32 // How many offsets are being returned
// OFFSETS = int64[] // List of offsets
OffsetsBody.prototype.parse = function () {
console.assert(this.complete())
var offsets = []
var count = this.buffer.readUInt32BE(0)
for (var i = 0; i < count; i++) {
Expand Down
1 change: 0 additions & 1 deletion client/response-header.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ module.exports = function (
// RESPONSE_LENGTH = int32 // Length in bytes of entire response (excluding this field)
// ERROR_CODE = int16
ResponseHeader.prototype.parse = function () {
console.assert(this.complete())
this.length = this.buffer.readUInt32BE(0)
this.errno = this.buffer.readUInt16BE(4)
}
Expand Down
1 change: 0 additions & 1 deletion client/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module.exports = function (inherits) {
if (this.complete()) { return true }
var data = stream.read(this.remainingBytes)
if (!data) { return false }
console.assert(data.length <= this.remainingBytes)
data.copy(this.buffer, this.buffer.length - this.remainingBytes)
this.remainingBytes = this.remainingBytes - data.length
return this.complete()
Expand Down
7 changes: 4 additions & 3 deletions consumer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = function (
logger,
async,
os,
inherits,
Expand All @@ -16,12 +17,12 @@ module.exports = function (
this.owners = {}
}

Consumer.prototype.consume = function (topic, interval, partitions) {
console.assert(Array.isArray(partitions))
Consumer.prototype.consume = function (topic, partitions) {
logger.assert(Array.isArray(partitions))
var name = topic.name
var owner = this.owners[name] || new Owner(topic, this.allBrokers)
this.owners[name] = owner
owner.consume(partitions, interval)
owner.consume(partitions)
}

Consumer.prototype.stop = function (topic, partitions) {
Expand Down
22 changes: 11 additions & 11 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
var Kafka = require('./index')

var kafka = new Kafka({
//zookeeper: 'localhost:2181',
brokers: [{
id: 0,
host: 'localhost',
port: 9092,
topics: {
//foo: 2,
bar: 1
}
}],
zookeeper: 'localhost:2181',
// brokers: [{
// id: 0,
// host: 'localhost',
// port: 9092,
// topics: {
// //foo: 2,
// bar: 1
// }
// }],
compression: 'gzip',
queueTime: 2000,
batchSize: 200
Expand All @@ -24,7 +24,7 @@ kafka.connect(function () {
bar.on(
'message',
function (m) {
console.log(m.toString())
//console.log(m.toString())
}
)

Expand Down
12 changes: 6 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
var net = require('net')
var os = require('os')
var async = require('async')
var inherits = require('util').inherits
var EventEmitter = require('events').EventEmitter
var ZooKeeper = require('zookeeper')
var logger = console

var Client = require('./client')
var Partition = require('./partition')()
var Partition = require('./partition')(logger)
var Owner = require('./owner')(Partition)
var Consumer = require('./consumer')(async, os, inherits, EventEmitter, Owner)
var Consumer = require('./consumer')(logger, async, os, inherits, EventEmitter, Owner)
var Broker = require('./broker')(inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(inherits, EventEmitter)
var Producer = require('./producer')(inherits, EventEmitter, BrokerPool)
var StaticConnector = require('./static-connector')(inherits, EventEmitter, Producer, Consumer, BrokerPool, Broker)
var ZK = require('./zk')(async, inherits, EventEmitter, ZooKeeper)
var ZKConnector = require('./zkconnector')(async, inherits, EventEmitter, ZK, Producer, Consumer, BrokerPool, Broker)
var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Producer, Consumer, BrokerPool, Broker)
var ZK = require('./zk')(logger, async, inherits, EventEmitter, ZooKeeper)
var ZKConnector = require('./zkconnector')(logger, async, inherits, EventEmitter, ZK, Producer, Consumer, BrokerPool, Broker)
var MessageBuffer = require('./message-buffer')()
var Topic = require('./topic')(inherits, EventEmitter, MessageBuffer)
var kafka = require('./kafka')(inherits, EventEmitter, Topic, ZKConnector, StaticConnector, Client.compression)
Expand Down
3 changes: 1 addition & 2 deletions owner.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module.exports = function (Partition) {
this.partitions = {}
}

Owner.prototype.consume = function (partitions, interval) {
Owner.prototype.consume = function (partitions) {
for (var i = 0; i < partitions.length; i++) {
var name = partitions[i]
var split = name.split('-')
Expand All @@ -16,7 +16,6 @@ module.exports = function (Partition) {
var broker = this.brokers.get(brokerId)
var partition = this.partitions[name] ||
new Partition(this.topic, broker, partitionNo)
partition.interval = interval
partition.reset()
this.partitions[name] = partition
}
Expand Down
5 changes: 3 additions & 2 deletions partition.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module.exports = function () {
module.exports = function (logger) {
function handleResponse(err, length, messages) {
if (err) {
return this.topic.error(err)
Expand All @@ -13,14 +13,15 @@ module.exports = function () {
}

function fetch() {
logger.log("fetching " + this.topic.name + ':' + this.broker.id + '-' + this.partition)
this.broker.fetch(this.topic, this.partition, this.maxSize, this.fetchResponder)
}

function Partition(topic, broker, partition) {
this.topic = topic
this.broker = broker
this.partition = partition
this.interval = 0
this.interval = this.topic.interval
this.offset = 0
this.maxSize = 300 * 1024 //TODO set via option
this.fetcher = fetch.bind(this)
Expand Down
7 changes: 4 additions & 3 deletions static-connector.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = function (
logger,
inherits,
EventEmitter,
Producer,
Expand Down Expand Up @@ -58,9 +59,9 @@ module.exports = function (
}
inherits(StaticConnector, EventEmitter)

StaticConnector.prototype.consume = function (topic, interval, partitions) {
console.assert(partitions)
this.consumer.consume(topic, interval, partitions)
StaticConnector.prototype.consume = function (topic, partitions) {
logger.assert(partitions)
this.consumer.consume(topic, partitions)
}

StaticConnector.prototype.stopConsuming = function (topic, partitions) {
Expand Down
4 changes: 3 additions & 1 deletion topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module.exports = function (

function Topic(name, connector, compression, batchSize, queueTime) {
this.name = name || ''
this.interval = 0
this.connector = connector
this.ready = true
this.compression = compression
Expand Down Expand Up @@ -49,7 +50,8 @@ module.exports = function (
}

Topic.prototype.consume = function (interval, partitions) { //TODO: starting offset?
this.connector.consume(this, interval, partitions)
this.interval = interval
this.connector.consume(this, partitions)
}

return Topic
Expand Down
27 changes: 23 additions & 4 deletions zk.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = function (
logger,
async,
inherits,
EventEmitter,
Expand All @@ -15,7 +16,7 @@ module.exports = function (
})
this.zk.once(
'close',
function () { console.log('zk close')}
function () { logger.log('zk close')}
)
EventEmitter.call(this)
}
Expand Down Expand Up @@ -141,6 +142,8 @@ module.exports = function (
function (next) {
self.zk.a_exists(path, false,
function (rc, err, stat) {
logger.log('exists ' + path)
logger.log(stat)
next(err, stat)
}
)
Expand All @@ -149,13 +152,17 @@ module.exports = function (
if (stat) {
self.zk.a_set(path, data, stat.version,
function (rc, err, stat) {
logger.log('set ' + path)
logger.log(stat)
next(err, stat)
}
)
}
else {
self.zk.a_create(path, data, options,
function (rc, err, stat) {
logger.log('create ' + path)
logger.log(stat)
next(err, stat)
}
)
Expand All @@ -179,37 +186,49 @@ module.exports = function (
},
function (err) {
if (err) {
console.log(err)
logger.log(err)
}
logger.log('created roots')
cb(err)
}
)
}

function toTopicString(topics) {
var names = Object.keys(topics)
var ts = {}
for (var i = 0; i < names.length; i++) {
ts[names[i]] = 1
}
return JSON.stringify(ts)
}

ZK.prototype.registerTopics = function (topics, consumer, cb) {
var self = this
logger.log('registerTopics')
async.series([
function (next) {
self._createConsumerRoots(consumer.groupId, next)
},
function (next) {
self._createOrReplace(
'/consumers/' + consumer.groupId + '/ids/' + consumer.consumerId,
JSON.stringify(topics),
toTopicString(topics),
ZooKeeper.ZOO_EPHEMERAL,
next
)
}
],
function (err) {
logger.log('registeredTopics')
cb(err)
}
)
}

ZK.prototype.getTopicPartitions = function (topics, consumer, cb) {
//TODO
cb([{topic: 'bar', interval: 200, partitions: ['0-0']}])
cb(null, [{topic: topics['bar'], partitions: ['0-0']}])
}

return ZK
Expand Down
13 changes: 8 additions & 5 deletions zkconnector.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = function (
logger,
async,
inherits,
EventEmitter,
Expand All @@ -23,14 +24,14 @@ module.exports = function (
this.allBrokers.on(
'brokerAdded',
function (b) {
console.log('added ' + b.id)
logger.log('added ' + b.id)
self.emit('brokerAdded', b)
}
)
this.allBrokers.on(
'brokerRemoved',
function (b) {
console.log('removed ' + b.id)
logger.log('removed ' + b.id)
self.emit('brokerRemoved', b)
}
)
Expand Down Expand Up @@ -93,6 +94,7 @@ module.exports = function (

ZKConnector.prototype._rebalance = function () {
var self = this
logger.log('rebalancing')
async.waterfall([
function (next) {
self.consumer.drain(next)
Expand All @@ -102,9 +104,10 @@ module.exports = function (
self.zk.getTopicPartitions(self.interestedTopics, self.consumer, next)
},
function (topicPartitions) {
logger.log(topicPartitions)
for(var i = 0; i < topicPartitions.length; i++) {
var tp = topicPartitions[i]
self.consumer.consume(tp.topic, tp.interval, tp.partitions)
self.consumer.consume(tp.topic, tp.partitions)
}
}
]
Expand All @@ -125,9 +128,9 @@ module.exports = function (
}
}

ZKConnector.prototype.consume = function (topic, interval) {
ZKConnector.prototype.consume = function (topic) {
this.hasPendingTopics = true
this.interestedTopics[topic.name] = 1 //TODO propagate interval
this.interestedTopics[topic.name] = topic
process.nextTick(this.registerTopics)
}

Expand Down

0 comments on commit 90c4340

Please sign in to comment.