Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 3 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
4 broker.js
@@ -93,8 +93,8 @@ module.exports = function (
this.topicPartitions = {}
}
- Broker.prototype.fetch = function (name, partition, maxSize, cb) {
- this.client.fetch(name, partition, maxSize, cb)
+ Broker.prototype.fetch = function (topic, partition, cb) {
+ this.client.fetch(topic, partition, cb)
}
Broker.prototype.write = function (topic, messages, cb) {
View
28 client/client.js
@@ -94,21 +94,13 @@ module.exports = function (
}
// cb: function (err, length, messages) {}
- Client.prototype.fetch = function (name, partition, maxSize, cb) {
+ Client.prototype.fetch = function (topic, partition, cb) {
logger.info(
- 'fetching', name,
+ 'fetching', topic.name,
'broker', this.id,
'partition', partition.id
)
- return this._send(
- new FetchRequest(
- name,
- partition.offset,
- partition.id,
- maxSize
- ),
- cb
- )
+ return this._send(new FetchRequest(topic, partition), cb)
}
// topic: a Topic object
@@ -123,22 +115,22 @@ module.exports = function (
)
return this._send(
new ProduceRequest(
- topic.name,
- messages.map(Message.create),
+ topic,
partitionId,
- topic.compression,
- topic.maxMessageSize
+ messages.map(Message.create)
),
cb
)
}
- Client.prototype.offsets = function (time, maxCount, cb) {
+ Client.prototype.offsets = function (topic, partition, time, maxCount, cb) {
logger.info(
'offsets', time,
- 'broker', this.id
+ 'topic', topic.name,
+ 'broker', this.id,
+ 'partition', partition.id
)
- return this._send(new OffsetsRequest(time, maxCount), cb)
+ return this._send(new OffsetsRequest(topic, partition, time, maxCount), cb)
}
Client.compression = Message.compression
View
16 client/fetch-request.js
@@ -4,11 +4,9 @@ module.exports = function (
FetchBody,
int53) {
- function FetchRequest(topic, offset, partitionId, maxSize) {
- this.topic = topic || ""
- this.partitionId = partitionId || 0
- this.offset = offset || 0
- this.maxSize = maxSize || (1024 * 1024)
+ function FetchRequest(topic, partition) {
+ this.topic = topic
+ this.partition = partition
}
// 0 1 2 3
@@ -29,13 +27,13 @@ module.exports = function (
FetchRequest.prototype.serialize = function (stream, cb) {
var err = null
var payload = new Buffer(12)
- int53.writeUInt64BE(this.offset, payload)
- payload.writeUInt32BE(this.maxSize, 8)
+ int53.writeUInt64BE(this.partition.offset, payload)
+ payload.writeUInt32BE(this.topic.maxFetchSize, 8)
var header = new RequestHeader(
payload.length,
RequestHeader.types.FETCH,
- this.topic,
- this.partitionId
+ this.topic.name,
+ this.partition.id
)
try {
header.serialize(stream)
View
8 client/offsets-request.js
@@ -4,7 +4,9 @@ module.exports = function (
OffsetsBody,
int53) {
- function OffsetsRequest(time, maxCount) {
+ function OffsetsRequest(topic, partition, time, maxCount) {
+ this.topic = topic
+ this.partition = partition
this.time = time
this.maxCount = maxCount
}
@@ -34,7 +36,9 @@ module.exports = function (
var header = new RequestHeader(
payload.length,
RequestHeader.types.OFFSETS,
- "test")
+ this.topic.name,
+ this.partition.id
+ )
try {
header.serialize(stream)
var written = stream.write(payload)
View
14 client/produce-request.js
@@ -12,12 +12,10 @@ module.exports = function (
inherits(ProduceError, Error)
ProduceError.prototype.name = 'Produce Error'
- function ProduceRequest(topic, messages, partitionId, compression, maxSize) {
- this.topic = topic || ""
- this.messages = messages || []
+ function ProduceRequest(topic, partitionId, messages) {
+ this.topic = topic
this.partitionId = partitionId
- this.compression = compression
- this.maxSize = maxSize
+ this.messages = messages || []
}
function messageToBuffer(m) { return m.toBuffer() }
@@ -30,7 +28,7 @@ module.exports = function (
var wrapper = new Message()
wrapper.setData(
Buffer.concat(messageBuffers, messagesLength),
- this.compression,
+ this.topic.compression,
function (err) {
cb(err, wrapper.toBuffer())
}
@@ -59,13 +57,13 @@ module.exports = function (
if (err) {
return cb(err)
}
- if (buffer.length > this.maxSize) {
+ if (buffer.length > this.topic.maxMessageSize) {
return cb(new ProduceError("message too big", buffer.length))
}
var header = new RequestHeader(
buffer.length + 4,
RequestHeader.types.PRODUCE,
- this.topic,
+ this.topic.name,
this.partitionId
)
try {
View
5 kafka.js
@@ -17,11 +17,13 @@ module.exports = function (
// maxMessageSize: -1
// queueTime: 5000
// batchSize: 200
+ // groupId: 'franz-kafka'
// }
//
function Kafka(options) {
this.topics = {}
this.options = options || {}
+ this.options.groupId = this.options.groupId || 'franz-kafka'
this.connector = null
this.topicDefaults = this.defaultOptions(options)
EventEmitter.call(this)
@@ -109,13 +111,12 @@ module.exports = function (
//
// }
Kafka.prototype.topic = function (name, options) {
- options = setTopicOptions(options, this.topicDefaults)
var topic = this.topics[name] ||
new Topic(
name,
this.connector.producer,
this.connector.consumer,
- options
+ setTopicOptions(options, this.topicDefaults)
)
this.topics[name] = topic
return topic
View
7 partition.js
@@ -32,9 +32,8 @@ module.exports = function (logger) {
function fetch() {
if (this.broker.isReady()) {
this.broker.fetch(
- this.topic.name,
+ this.topic,
this,
- this.topic.maxFetchSize,
this.fetchResponder
)
}
@@ -49,8 +48,8 @@ module.exports = function (logger) {
this.broker = broker
this.id = id
this.fetchDelay = this.topic.minFetchDelay
- this.emptyFetches = offset || 0
- this.offset = 0
+ this.emptyFetches = 0
+ this.offset = offset || 0
this.fetcher = fetch.bind(this)
this.fetchResponder = handleResponse.bind(this)
this.paused = true
View
1  readme.md
@@ -72,6 +72,7 @@ var kafka = new Kafka({
batchSize: 200, // number of messages to bundle before producing
// consumer defaults
+ groupId: 'franz-kafka', // the consumer group name this instance is part of
minFetchDelay: 0, // minimum milliseconds to wait between fetches
maxFetchDelay: 10000, // maximum milliseconds to wait between fetches
maxFetchSize: 300*1024, // limits the size of a fetched message

No commit comments for this range

Something went wrong with that request. Please try again.