Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Change metadata request to version 1.
This allow client to identify broker rack.
Rack-aware partitioner implementation for producers.
  • Loading branch information
spasam committed Feb 1, 2018
1 parent a841fba commit e9cc168c60ea310c957dc1b67cd10b5ccc06dc0e
Showing with 100 additions and 30 deletions.
  1. +1 −0 kafka.js
  2. +15 −10 lib/baseProducer.js
  3. +2 −2 lib/highLevelProducer.js
  4. +66 −13 lib/partitioner.js
  5. +11 −3 lib/protocol/protocol.js
  6. +1 −1 lib/protocol/protocol_struct.js
  7. +4 −1 test/test.partitioner.js
@@ -16,3 +16,4 @@ exports.CyclicPartitioner = require('./lib/partitioner').CyclicPartitioner;
exports.RandomPartitioner = require('./lib/partitioner').RandomPartitioner;
exports.KeyedPartitioner = require('./lib/partitioner').KeyedPartitioner;
exports.CustomPartitioner = require('./lib/partitioner').CustomPartitioner;
exports.RackawarePartitioner = require('./lib/partitioner').RackawarePartitioner;
@@ -14,21 +14,24 @@ var RandomPartitioner = partitioner.RandomPartitioner;
var CyclicPartitioner = partitioner.CyclicPartitioner;
var KeyedPartitioner = partitioner.KeyedPartitioner;
var CustomPartitioner = partitioner.CustomPartitioner;
var RackawarePartitioner = partitioner.RackawarePartitioner;

var PARTITIONER_TYPES = {
default: 0,
random: 1,
cyclic: 2,
keyed: 3,
custom: 4
custom: 4,
rackaware: 5
};

var PARTITIONER_MAP = {
0: DefaultPartitioner,
1: RandomPartitioner,
2: CyclicPartitioner,
3: KeyedPartitioner,
4: CustomPartitioner
4: CustomPartitioner,
5: RackawarePartitioner
};

var DEFAULTS = {
@@ -49,10 +52,12 @@ var DEFAULTS = {
* @param {Number} [options.ackTimeoutMs=100] The amount of time in milliseconds to wait for all acks before considered
* the message as errored
* @param {Number} [defaultPartitionType] The default partitioner type
* @param {Object} [customPartitioner] a custom partitinoer to use of the form: function (partitions, key)
* @param {Object} [partitionerArgs] for custom partitioner use of the form: function (partitions, key).
* For rack-aware partitioner type, this should be the rack closest to the client.
* For all other partitioner's this will be ignored.
* @constructor
*/
function BaseProducer (client, options, defaultPartitionerType, customPartitioner) {
function BaseProducer (client, options, defaultPartitionerType, partitionerArgs) {
options = options || {};

this.ready = false;
@@ -61,16 +66,16 @@ function BaseProducer (client, options, defaultPartitionerType, customPartitione
this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs;

if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
} else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No customer partitioner defined');
if (partitionerArgs === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No custom partitioner defined');
} else if (partitionerArgs === undefined && options.partitionerType === PARTITIONER_TYPES.rackaware) {
throw new Error('Client rack information not specified');
}

var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];

// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.partitioner = new partitionerType(partitionerArgs);

this.connect();
}
@@ -124,7 +129,7 @@ BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
payloads.forEach(p => {
p.partition = p.hasOwnProperty('partition')
? p.partition
: this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition'), p.key);
: this.partitioner.getPartition(topicMetadata[p.topic], p);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];

@@ -4,8 +4,8 @@ var util = require('util');
var BaseProducer = require('./baseProducer');

/** @inheritdoc */
function HighLevelProducer (client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner);
function HighLevelProducer (client, options, partitionerArgs) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, partitionerArgs);
}

util.inherits(HighLevelProducer, BaseProducer);
@@ -8,37 +8,56 @@ var Partitioner = function () {};
var DefaultPartitioner = function () {};
util.inherits(DefaultPartitioner, Partitioner);

DefaultPartitioner.prototype.getPartition = function (partitions) {
DefaultPartitioner.prototype.getPartition = function (topicMetadata) {
if (!topicMetadata) {
return 0;
}

const partitions = _.map(topicMetadata, 'partition');
if (partitions && _.isArray(partitions) && partitions.length > 0) {
return partitions[0];
} else {
return 0;
}
return 0;
};

var CyclicPartitioner = function () {
this.c = 0;
};
util.inherits(CyclicPartitioner, Partitioner);

CyclicPartitioner.prototype.getPartition = function (partitions) {
if (_.isEmpty(partitions)) return 0;
return partitions[ this.c++ % partitions.length ];
CyclicPartitioner.prototype.getPartition = function (topicMetadata) {
if (!topicMetadata) {
return 0;
}

const partitions = _.map(topicMetadata, 'partition');
if (partitions && _.isArray(partitions) && partitions.length > 0) {
return partitions[ this.c++ % partitions.length ];
}
return 0;
};

var RandomPartitioner = function () {};
util.inherits(RandomPartitioner, Partitioner);

RandomPartitioner.prototype.getPartition = function (partitions) {
return partitions[Math.floor(Math.random() * partitions.length)];
RandomPartitioner.prototype.getPartition = function (topicMetadata) {
if (!topicMetadata) {
return 0;
}

const partitions = _.map(topicMetadata, 'partition');
if (partitions && _.isArray(partitions) && partitions.length > 0) {
return partitions[Math.floor(Math.random() * partitions.length)];
}
return 0;
};

var KeyedPartitioner = function () {};
util.inherits(KeyedPartitioner, Partitioner);

// Taken from oid package (Dan Bornstein)
// Copyright The Obvious Corporation.
KeyedPartitioner.prototype.hashCode = function (string) {
function hashCode (string) {
var hash = 0;
var length = string.length;

@@ -49,20 +68,54 @@ KeyedPartitioner.prototype.hashCode = function (string) {
return (hash === 0) ? 1 : hash;
};

KeyedPartitioner.prototype.getPartition = function (partitions, key) {
key = key || '';
KeyedPartitioner.prototype.getPartition = function (topicMetadata, payload) {
if (!topicMetadata) {
return 0;
}

var index = this.hashCode(key) % partitions.length;
return partitions[index];
const partitions = _.map(topicMetadata, 'partition');
if (partitions && _.isArray(partitions) && partitions.length > 0) {
const key = payload && payload.key ? payload.key : '';
const index = hashCode(key) % partitions.length;
return partitions[index];
}
return 0;
};

var CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
};
util.inherits(CustomPartitioner, Partitioner);

var RackawarePartitioner = function (rack) {
this.rack = rack;
};
util.inherits(RackawarePartitioner, Partitioner);

RackawarePartitioner.prototype.getPartition = function (topicMetadata, payload) {
if (!topicMetadata) {
return 0;
}

let rackMetadata = _.filter(topicMetadata, partition => {
return this.rack === partition.rack;
});
if (!rackMetadata || !_.isArray(rackMetadata) || rackMetadata.length < 1) {
rackMetadata = topicMetadata;
}

const partitions = _.map(rackMetadata, 'partition');
if (partitions && _.isArray(partitions) && partitions.length > 0) {
const key = payload && payload.key ? payload.key : '';
const index = hashCode(key) % partitions.length;
return partitions[index];
}
return 0;
};

module.exports.DefaultPartitioner = DefaultPartitioner;
module.exports.CyclicPartitioner = CyclicPartitioner;
module.exports.RandomPartitioner = RandomPartitioner;
module.exports.KeyedPartitioner = KeyedPartitioner;
module.exports.CustomPartitioner = CustomPartitioner;
module.exports.RackawarePartitioner = RackawarePartitioner;
@@ -225,7 +225,7 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi
}

function encodeMetadataRequest (clientId, correlationId, topics) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata);
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata, 1);
request.Int32BE(topics.length);
topics.forEach(function (topic) {
request.Int16BE(topic.length).string(topic);
@@ -243,6 +243,7 @@ function decodeMetadataResponse (resp) {
.word32bs('correlationId')
.word32bs('brokerNum')
.loop(decodeBrokers)
.word32bs('controllerId')
.word32bs('topicNum')
.loop(_decodeTopics);

@@ -255,8 +256,13 @@ function decodeMetadataResponse (resp) {
vars.host = vars.host.toString();
})
.word32bs('port')
.word16bs('rack')
.tap(function (vars) {
brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port };
this.buffer('rack', vars.rack);
vars.rack = vars.rack.toString();
})
.tap(function (vars) {
brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port, rack: vars.rack };
});
}

@@ -268,6 +274,7 @@ function decodeMetadataResponse (resp) {
this.buffer('topic', vars.topic);
vars.topic = vars.topic.toString();
})
.word8bs('isInternal')
.word32bs('partitionNum')
.tap(function (vars) {
if (vars.topicError !== 0) {
@@ -298,7 +305,8 @@ function decodeMetadataResponse (resp) {
vars.partition,
vars.leader,
vars.replicas,
vars.isr
vars.isr,
brokers[vars.leader].rack
);
} else {
errors.push(ERROR_CODE[vars.errorCode]);
@@ -15,7 +15,7 @@ var KEYS = {
OffsetCommitRequest: ['topic', 'partition', 'offset', 'metadata', 'committing', 'autoCommitIntervalMs'],
OffsetCommitResponse: [],
TopicAndPartition: ['topic', 'partition'],
PartitionMetadata: ['topic', 'partition', 'leader', 'replicas', 'isr'],
PartitionMetadata: ['topic', 'partition', 'leader', 'replicas', 'isr', 'rack'],
Message: ['magic', 'attributes', 'key', 'value', 'timestamp'],
ProduceRequest: ['topic', 'partition', 'messages', 'attributes'],
Request: ['payloads', 'encoder', 'decoder', 'callback']
@@ -9,9 +9,12 @@ var KeyedPartitioner = kafka.KeyedPartitioner;
var CustomPartitioner = kafka.CustomPartitioner;

function getPartitions (partitioner, partitions, count) {
const topicMetadata = [];
partitions.forEach(p => topicMetadata.push({ partition: p }));

var arr = [];
for (var i = 0; i < count; i++) {
arr.push(partitioner.getPartition(partitions));
arr.push(partitioner.getPartition(topicMetadata));
}
return arr;
}

0 comments on commit e9cc168

Please sign in to comment.