Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added starting offset to consume partition options

  • Loading branch information...
commit 6056ff40d6fe7a2d335395bea827010b0b543925 1 parent 0263928
@dannycoates authored
View
8 consumer.js
@@ -18,18 +18,18 @@ module.exports = function (
this.owners = {}
}
- Consumer.prototype.consume = function (topic, partitionNames) {
- logger.assert(Array.isArray(partitionNames))
+ Consumer.prototype.consume = function (topic, partitionNamesWithOffsets) {
+ logger.assert(Array.isArray(partitionNamesWithOffsets))
logger.info(
'consuming', topic.name,
- 'partitions', partitionNames.length,
+ 'partitions', partitionNamesWithOffsets.length,
'group', this.groupId,
'consumer', this.consumerId
)
var name = topic.name
var owner = this.owners[name] || new Owner(topic, this.allBrokers)
this.owners[name] = owner
- owner.consume(partitionNames)
+ owner.consume(partitionNamesWithOffsets)
}
Consumer.prototype.stop = function (topic, partitionNames) {
View
1  kafka.js
@@ -46,7 +46,6 @@ module.exports = function (
Kafka.prototype.defaultOptions = function (options) {
var defaults = {}
- defaults.startOffset = options.startOffset || 0
defaults.queueTime = options.queueTime || 5000
defaults.batchSize = options.batchSize || 200
defaults.minFetchDelay = options.minFetchDelay || 0
View
18 owner.js
@@ -8,17 +8,19 @@ module.exports = function (Partition) {
this.paused = true
}
- Owner.prototype.consume = function (partitionNames) {
+ Owner.prototype.consume = function (partitionNamesWithOffsets) {
this.paused = false
- for (var i = 0; i < partitionNames.length; i++) {
- var name = partitionNames[i]
- var split = name.split('-')
- if (split.length === 2) {
- var brokerId = +split[0]
- var partitionNo = +split[1]
+ for (var i = 0; i < partitionNamesWithOffsets.length; i++) {
+ var nameAndOffset = partitionNamesWithOffsets[i].split(':')
+ var name = nameAndOffset[0]
+ var offset = +(nameAndOffset[1] || 0)
+ var brokerPartition = name.split('-')
+ if (brokerPartition.length === 2) {
+ var brokerId = +brokerPartition[0]
+ var partitionNo = +brokerPartition[1]
var broker = this.brokers.get(brokerId)
var partition = this.partitionsByName[name] ||
- new Partition(this.topic, broker, partitionNo)
+ new Partition(this.topic, broker, partitionNo, offset)
this.partitionsByName[name] = partition
if(this.partitions.indexOf(partition) === -1) {
View
4 partition.js
@@ -48,12 +48,12 @@ module.exports = function (logger) {
}
}
- function Partition(topic, broker, id) {
+ function Partition(topic, broker, id, offset) {
this.topic = topic
this.broker = broker
this.id = id
this.fetchDelay = this.topic.minFetchDelay
- this.emptyFetches = 0
+ this.emptyFetches = offset || 0
this.offset = 0
this.fetcher = fetch.bind(this)
this.fetchResponder = handleResponse.bind(this)
View
2  topic.js
@@ -21,7 +21,7 @@ module.exports = function (
// batchSize: number (count)
// queueTime: number (ms)
// partitions: {
- // consume: [string] (broker-partition) ex. '0-0'
+ // consume: [string] (broker-partition:offset) ex. '0-0:123'
// produce: [string] (broker:partitionCount) ex. '0:5'
// }
// }
View
3  zk.js
@@ -226,7 +226,8 @@ module.exports = function (
ZK.prototype.getTopicPartitions = function (topics, consumer, cb) {
//TODO
- cb(null, [{topic: topics['bazzz'], partitions: ['0-0']}])
+ throw new Error("Not Implemented")
+ cb(null, [{topic: topics['bazzz'], partitions: ['0-0:0']}])
}
return ZK
Please sign in to comment.
Something went wrong with that request. Please try again.