Skip to content

Commit

Permalink
Upgrade to lodash 4
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Jan 17, 2017
1 parent 399e91c commit 38765c4
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion lib/assignment/roundrobin.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function assignRoundRobin (topicPartition, groupMembers, callback) {

topicPartitionList.forEach(function (tp) {
var topic = tp.topic;
while (!_.contains(subscriberMap[assigner.peek()], topic)) {
while (!_.includes(subscriberMap[assigner.peek()], topic)) {
assigner.next();
}
assignment[assigner.next()].push(tp);
Expand Down
2 changes: 1 addition & 1 deletion lib/baseProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ BaseProducer.prototype.send = function (payloads, cb) {
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.pluck(topicMetadata[p.topic], 'partition'), p.key);
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];

Expand Down
4 changes: 2 additions & 2 deletions lib/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ function ConsumerGroup (memberOptions, topics) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error));
return;
}
const offset = _.first(result[topic.topic][topic.partition]);
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;

debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
Expand Down Expand Up @@ -249,7 +249,7 @@ ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callb
return callback(error);
}
self.defaultOffsets = _.mapValues(result, function (partitionOffsets) {
return _.mapValues(partitionOffsets, _.first);
return _.mapValues(partitionOffsets, _.head);
});
callback(null);
});
Expand Down
12 changes: 6 additions & 6 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function _encodeFetchRequest (clientId, correlationId, payloads, maxWaitMs, minB
request.Int16BE(topic.length)
.string(topic);

var partitions = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
Expand Down Expand Up @@ -306,7 +306,7 @@ function encodeOffsetCommitV2Request (clientId, correlationId, group, generation
request.Int16BE(topic.length)
.string(topic);

var partitions = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
Expand All @@ -332,7 +332,7 @@ function _encodeOffsetCommitRequest (clientId, correlationId, group, payloads) {
request.Int16BE(topic.length)
.string(topic);

var partitions = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
Expand Down Expand Up @@ -384,7 +384,7 @@ function _encodeProduceRequest (clientId, correlationId, payloads, requireAcks,
request.Int16BE(topic.length)
.string(topic);

var reqs = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var reqs = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(reqs.length);
reqs.forEach(function (p) {
var messageSet = encodeMessageSet(p.messages);
Expand Down Expand Up @@ -507,7 +507,7 @@ function _encodeOffsetFetchRequest (clientId, correlationId, group, payloads) {
request.Int16BE(topic.length)
.string(topic);

var partitions = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition);
Expand Down Expand Up @@ -591,7 +591,7 @@ function encodeOffsetRequest (clientId, correlationId, payloads) {
request.Int16BE(topic.length)
.string(topic);

var partitions = _.pairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"buffer-crc32": "~0.2.5",
"buffermaker": "~1.2.0",
"debug": "^2.1.3",
"lodash": ">3.0 <4.0",
"lodash": "^4.17.4",
"minimatch": "^3.0.2",
"nested-error-stacks": "^2.0.0",
"node-zookeeper-client": "~0.2.2",
Expand Down
6 changes: 3 additions & 3 deletions test/assignment/test.range.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('Range Assignment', function () {
it('should partition two topics of three partitions between two consumers', function (done) {
range.assign(topicPartition, groupMembers, function (error, result) {
should(error).be.empty;
const consumer1 = _.first(result);
const consumer1 = _.head(result);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql([0, 1]);
Expand Down Expand Up @@ -75,7 +75,7 @@ describe('Range Assignment', function () {

range.assign(topicPartition, gm, function (error, result) {
should(error).be.empty;
const consumer1 = _.first(result);
const consumer1 = _.head(result);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql([0]);
Expand Down Expand Up @@ -119,7 +119,7 @@ describe('Range Assignment', function () {

range.assign(topicPartition, gm, function (error, result) {
should(error).be.empty;
const consumer1 = _.first(result);
const consumer1 = _.head(result);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql([0]);
Expand Down
6 changes: 3 additions & 3 deletions test/assignment/test.roundrobin.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('Round Robin Assignment', function () {

roundRobin.assign(topicPartition, groupMembers, function (error, assignment) {
should(error).be.empty;
const consumer1 = _.first(assignment);
const consumer1 = _.head(assignment);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql(['1']);
Expand Down Expand Up @@ -93,7 +93,7 @@ describe('Round Robin Assignment', function () {
should(error).be.empty;
assignment = _.sortBy(assignment, 'memberId');

const consumer1 = _.first(assignment);
const consumer1 = _.head(assignment);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql(['0']);
Expand Down Expand Up @@ -154,7 +154,7 @@ describe('Round Robin Assignment', function () {
should(error).be.empty;
assignment = _.sortBy(assignment, 'memberId');

const consumer1 = _.first(assignment);
const consumer1 = _.head(assignment);
consumer1.memberId.should.eql('consumer1');
Object.keys(consumer1.topicPartitions).should.eql(['RebalanceTopic', 'RebalanceTest']);
consumer1.topicPartitions['RebalanceTest'].should.eql(['1']);
Expand Down
6 changes: 3 additions & 3 deletions test/helpers/Childrearer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Childrearer.prototype.closeAll = function (callback) {
};

Childrearer.prototype.kill = function (numberOfChildren, callback) {
var children = _.sample(this.children, numberOfChildren);
var children = _.sampleSize(this.children, numberOfChildren);
this._killEachChild(children, callback);
};

Expand All @@ -44,7 +44,7 @@ Childrearer.prototype.killLast = function (callback) {
};

Childrearer.prototype.killFirst = function (callback) {
var child = _.first(this.children);
var child = _.head(this.children);
this._killEachChild([child], callback);
};

Expand All @@ -61,7 +61,7 @@ Childrearer.prototype._killEachChild = function (children, callback) {
};

Childrearer.prototype.raise = function (children, callback, waitTime) {
var newChildren = _.times(children, this._raiseChild, this);
var newChildren = _.times(children, _.bind(this._raiseChild, this));

this.children = this.children.concat(newChildren);

Expand Down
2 changes: 1 addition & 1 deletion test/test.highlevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var host = process.env['KAFKA_TEST_HOST'] || '';
assert(_.chain(result)
.find({topic: topic, partition: partition})
.result('messages')
.pluck('value')
.map('value')
.includes(value)
.value()
, `Value "${value}" is not in topic "${topic}" partition ${partition}`);
Expand Down

0 comments on commit 38765c4

Please sign in to comment.