Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added some doc comments

  • Loading branch information...
commit fa403f3db0104d9592ea2ee5a632c9e18ba45baf 1 parent 5726b0d
@dannycoates authored
View
2  broker-pool.js
@@ -1,4 +1,6 @@
module.exports = function (logger, inherits, EventEmitter) {
+
+ // A collection of Broker objects accessible by id or all
function BrokerPool() {
this.brokers = []
this.brokersById = {}
View
11 broker.js
@@ -1,9 +1,8 @@
-module.exports = function (
- logger,
- inherits,
- EventEmitter,
- Client) {
+module.exports = function (logger, inherits, EventEmitter, Client) {
+ // A Broker represents a Kafka server. It attempts to maintain a connection
+ // to the server until it is 'destroy'ed. If the connection is dropped,
+ // the Broker will attempt to reconnect, backing off exponentially on failure.
function Broker(id, options) {
this.id = id
this.client = Client.nil
@@ -69,6 +68,8 @@ module.exports = function (
)
}
+ // Event handlers
+
function clientConnect() {
logger.info('broker connected', this.id)
this.reconnectAttempts = 0
View
0  client/fetch-body.js → client/fetch-response.js
File renamed without changes
View
8 client/index.js
@@ -21,12 +21,12 @@ module.exports = function (logger) {
var Message = require('./message')(zlib, snappy, crc32)
var RequestHeader = require('./request-header')()
var ResponseHeader = require('./response-header')(inherits, State)
- var FetchBody = require('./fetch-body')(inherits, State, Message)
- var OffsetsBody = require('./offsets-body')(inherits, State, int53)
+ var FetchResponse = require('./fetch-response')(inherits, State, Message)
+ var OffsetsResponse = require('./offsets-response')(inherits, State, int53)
var Response = require('./response')(logger, State, ResponseHeader)
var Receiver = require('./receiver')(logger, inherits, EventEmitter, State)
- var FetchRequest = require('./fetch-request')(RequestHeader, Response, FetchBody, int53)
- var OffsetsRequest = require('./offsets-request')(RequestHeader, Response, OffsetsBody, int53)
+ var FetchRequest = require('./fetch-request')(RequestHeader, Response, FetchResponse, int53)
+ var OffsetsRequest = require('./offsets-request')(RequestHeader, Response, OffsetsResponse, int53)
var ProduceRequest = require('./produce-request')(inherits, RequestHeader, Message, State)
var Client = require('./client')(
logger,
View
0  client/offsets-body.js → client/offsets-response.js
File renamed without changes
View
10 example.js
@@ -4,7 +4,7 @@ var fs = require('fs')
var file = fs.createWriteStream('./test.txt')
var kafka = new Kafka({
- zookeeper: 'localhost:2181',
+ //zookeeper: 'localhost:2181',
brokers: [{
id: 0,
host: 'localhost',
@@ -24,10 +24,10 @@ file.once('open', function () {
kafka.connect(function () {
var baz = kafka.topic('bazzz', {
- // partitions: {
- // consume: ['0-0'],
- // produce: ['0:1']
- // }
+ partitions: {
+ consume: ['0-0'],
+ produce: ['0:1']
+ }
})
baz.pipe(file)
View
45 kafka.js
@@ -13,13 +13,17 @@ module.exports = function (
// Topics to your heart's content.
//
// options: {
- // zookeeper: 'address:port'
- // brokers: [{name: host: port: },...]
- // compression: 'none', 'gzip', 'snappy'
- // maxMessageSize: -1
- // queueTime: 5000
- // batchSize: 200
- // groupId: 'franz-kafka'
+ // zookeeper: 'address:port',
+ // brokers: [{id: host: port: },...],
+ // compression: 'none',
+ // maxMessageSize: 1000000,
+ // queueTime: 5000,
+ // batchSize: 200,
+ // groupId: 'franz-kafka',
+ // minFetchDelay: 0,
+ // maxFetchDelay: 10000,
+ // maxFetchSize: 300*1024,
+ // logger: null,
// }
//
function Kafka(options) {
@@ -62,6 +66,7 @@ module.exports = function (
Kafka.prototype.defaultOptions = function (options) {
var defaults = {}
+ options = options || {}
defaults.queueTime = options.queueTime || 5000
defaults.batchSize = options.batchSize || 200
defaults.minFetchDelay = options.minFetchDelay || 0
@@ -104,8 +109,20 @@ module.exports = function (
//
// name: string
// options: {
- //
+ // minFetchDelay: 0, // defaults to the kafka.minFetchDelay
+ // maxFetchDelay: 10000, // defaults to the kafka.maxFetchDelay
+ // maxFetchSize: 1000000, // defaults to the kafka.maxFetchSize
+ // compression: 'none', // defaults to the kafka.compression
+ // batchSize: 200, // defaults to the kafka.batchSize
+ // queueTime: 5000, // defaults to the kafka.queueTime
+ // partitions: {
+ // consume: ['0-0:0'], // array of strings with the form:
+ // // 'brokerId-partitionId:startOffset'
+ // produce: ['0:1'] // array of strings with the form:
+ // // 'brokerId:partitionCount'
+ // }
// }
+ // returns: a Topic object
Kafka.prototype.topic = function (name, options) {
var topic = this.topics[name] ||
new Topic(
@@ -117,14 +134,26 @@ module.exports = function (
return topic
}
+ // Register the topic with the connector.
+ //
+ // topic: a Topic object
+ //
+ // Specifically, this is for use with the ZooKeeper connector. For the static
+ // connector this is a no-op
Kafka.prototype.register = function (topic) {
this.connector.register(topic)
}
+ // Get a broker by id from the list of all brokers.
+ //
+ // id: the number of the broker in it's configuration
+ // returns: a Broker object or undefined
Kafka.prototype.broker = function (id) {
return this.allBrokers.get(id)
}
+ // Event handlers
+
function brokerAdded(broker) {
this.emit('connect')
}
View
8 message-buffer.js
@@ -1,7 +1,9 @@
-module.exports = function (
- inherits,
- EventEmitter) {
+module.exports = function (inherits, EventEmitter) {
+ // A MessageBuffer holds messages for batch writing until either 'batchSize'
+ // messages accumulate or 'queueTime' ms has passed since the last write.
+ // If no partitions are ready for the 'write' the MessageBuffer will buffer
+ // an unbounded number of messages until a partition is ready.
function MessageBuffer(partitions, batchSize, queueTime) {
this.partitions = partitions
this.batchSize = batchSize
View
23 partition-set.js
@@ -1,8 +1,7 @@
-module.exports = function (
- logger,
- inherits,
- EventEmitter) {
+module.exports = function (logger, inherits, EventEmitter) {
+ // A PartitionSet contains all of the known Partitions (for a Topic)
+ // It tracks which partitions are 'readable' and 'writable'
function PartitionSet() {
this.partitionsByName = {}
this.partitions = []
@@ -59,19 +58,17 @@ module.exports = function (
return this.partitions
}
+ function isReadyAndWritable(p) { return p.isReady() && p.isWritable() }
+
PartitionSet.prototype.isReady = function () {
- return this.partitions.some(
- function (p) {
- return p.isReady() && p.isWritable()
- }
- )
+ return this.partitions.some(isReadyAndWritable)
}
PartitionSet.prototype.nextWritable = function () {
var partition = null
for (var i = 0; i < this.partitions.length; i++) {
partition = this.next()
- if (partition.isWritable() && partition.isReady()) {
+ if (isReadyAndWritable(partition)) {
return partition
}
}
@@ -83,29 +80,27 @@ module.exports = function (
}
function readablePartition(p) { return p.isReadable() }
-
PartitionSet.prototype.readable = function () {
return this.partitions.filter(readablePartition)
}
function pausePartition(p) { p.pause() }
-
PartitionSet.prototype.pause = function () {
this.readable().forEach(pausePartition)
}
function resumePartition(p) { p.resume() }
-
PartitionSet.prototype.resume = function () {
this.readable().forEach(resumePartition)
}
function stopPartition(p) { p.stop() }
-
PartitionSet.prototype.stop = function () {
this.readable().forEach(stopPartition)
}
+ // Event handlers
+
function readableChanged(partition) {
if (partition.isReadable()) {
this.readables[partition.name] = partition
View
11 partition.js
@@ -1,5 +1,12 @@
module.exports = function (logger, inherits, EventEmitter, Broker) {
+ // A Partition represents the location of messages for a Topic.
+ // Partitions may be 'readable' and/or 'writable'.
+ //
+ // A readable Partition repeatedly fetches new messages from the server
+ // for consumption. The fetch loop is controlled with 'pause' and 'resume'.
+ //
+ // A writable Partition will write messages to the server for production.
function Partition(topic, broker, id) {
this.topic = topic
this.broker = broker
@@ -104,8 +111,6 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
return this.readable
}
- Partition.nil = new Partition({ minFetchDelay: 0 }, Broker.nil, -1)
-
function exponentialBackoff(attempt, delay) {
return Math.floor(
Math.random() * Math.pow(2, attempt) * 10 + delay
@@ -162,5 +167,7 @@ module.exports = function (logger, inherits, EventEmitter, Broker) {
this.emit('destroy', this)
}
+ Partition.nil = new Partition({ minFetchDelay: 0 }, Broker.nil, -1)
+
return Partition
}
View
4 readme.md
@@ -147,8 +147,8 @@ var foo = kafka.topic('foo', {
batchSize: 200, // defaults to the kafka.batchSize
queueTime: 5000, // defaults to the kafka.queueTime
partitions: {
- consume: ['0-0:0'] // array of strings with the form 'brokerId-partitionId:startOffset'
- produce: ['0:1'] // array of strings with the form 'brokerId:partitionCount'
+ consume: ['0-0:0'], // array of strings with the form 'brokerId-partitionId:startOffset'
+ produce: ['0:1'] // array of strings with the form 'brokerId:partitionCount'
}
})
```
View
7 static-connector.js
@@ -1,9 +1,4 @@
-module.exports = function (
- logger,
- inherits,
- EventEmitter,
- Broker
- ) {
+module.exports = function (logger, inherits, EventEmitter, Broker) {
// options: {
// brokers: [
View
25 topic.js
@@ -61,9 +61,10 @@ module.exports = function (
}
inherits(Topic, Stream)
- //emit end
- //emit close
+ //TODO emit end
+ //TODO emit close
+ // a partition is ready for writing
function partitionsReady() {
if(this.produceBuffer.flush()) {
logger.info('drain', this.name)
@@ -71,6 +72,11 @@ module.exports = function (
}
}
+ // Emits a 'data' event for each message of a fetch response.
+ // If the stream is paused, the message is added to bufferedMessages,
+ // which is flushed when the stream is resumed.
+ //
+ // payloads: an array of Buffers
function emitMessages(payloads) {
for (var i = 0; i < payloads.length; i++) {
var data = payloads[i]
@@ -90,6 +96,11 @@ module.exports = function (
}
}
+ // Emits the offset of the given messages, then unpacks and emits the
+ // message data
+ //
+ // partition: a Partition object
+ // messages: an Array of Message objects
Topic.prototype.parseMessages = function (partition, messages) {
this.emit('offset', partition.name, partition.offset)
for (var i = 0; i < messages.length; i++) {
@@ -97,6 +108,9 @@ module.exports = function (
}
}
+ // Emits the messages that were buffered while the stream was paused
+ //
+ // returns: boolean pause state of this topic
Topic.prototype._flushBufferedMessages = function () {
this.paused = false
while(!this.paused && this.bufferedMessages.length > 0) {
@@ -110,8 +124,11 @@ module.exports = function (
return this.paused || this.bufferedMessages.length > 0
}
- // Partitions
-
+ // Get or create a Partition object.
+ //
+ // name: string in the form of brokerId-partitionId
+ // ex. '12-6'
+ // returns: a Partition object
Topic.prototype.partition = function (name) {
var partition = this.partitions.get(name)
if (!partition) {
View
20 zkconnector.js
@@ -46,6 +46,12 @@ module.exports = function (
this.zk.on('consumers-changed', this._rebalance.bind(this))
}
+ function topicObject(name) { return this.interestedTopics[name] }
+
+ ZKConnector.prototype._topics = function () {
+ return Object.keys(this.interestedTopics).map(topicObject.bind(this))
+ }
+
ZKConnector.prototype._brokersChanged = function (brokerIds) {
var self = this
async.forEachSeries(
@@ -97,13 +103,19 @@ module.exports = function (
}
ZKConnector.prototype._rebalance = function () {
- var self = this
logger.info('rebalancing')
async.waterfall([
function (next) {
- self.consumer.stop()
- self.consumer.drain(next)
- },
+ async.forEach(
+ this._topics(),
+ function (topic, done) {
+ topic.stop()
+ topic.drain(done)
+ },
+ function (err) {
+ next()
+ }
+ }.bind(this),
function (next) {
self.zk.getTopicPartitions(self.interestedTopics, self.consumer, next)
},
Please sign in to comment.
Something went wrong with that request. Please try again.