Skip to content

Commit

Permalink
prettier formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Jan 30, 2018
1 parent 11230a0 commit bb7fde9
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions lib/kafkaClient.js
Expand Up @@ -333,7 +333,7 @@ KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadat
logger.debug('updating metadatas');
this.setBrokerMetadata(metadatas[0]);
if (replaceTopicMetadata) {
this.topicMetadata = metadatas[1].metadata;
this.topicMetadata = metadatas[1].metadata;
} else {
_.extend(this.topicMetadata, metadatas[1].metadata);
}
Expand Down Expand Up @@ -423,7 +423,9 @@ KafkaClient.prototype.getListGroups = function (callback) {
return callback(new Error('Client is not ready (getListGroups)'));
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(brokers, this.options.maxAsyncRequests,
async.mapValuesLimit(
brokers,
this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
Expand All @@ -434,7 +436,8 @@ KafkaClient.prototype.getListGroups = function (callback) {
const request = protocol.encodeListGroups(this.clientId, correlationId);
this.queueCallback(broker.socket, correlationId, [protocol.decodeListGroups, cb]);
broker.write(request);
}, (err, results) => {
},
(err, results) => {
if (err) {
callback(err);
return;
Expand All @@ -452,7 +455,9 @@ KafkaClient.prototype.getDescribeGroups = function (groups, callback) {
return callback(new Error('Client is not ready (getDescribeGroups)'));
}

async.groupByLimit(groups, this.options.maxAsyncRequests,
async.groupByLimit(
groups,
this.options.maxAsyncRequests,
(group, cb) => {
this.sendGroupCoordinatorRequest(group, (err, coordinator) => {
cb(err || null, coordinator ? coordinator.coordinatorId : undefined);
Expand All @@ -464,7 +469,9 @@ KafkaClient.prototype.getDescribeGroups = function (groups, callback) {
return;
}

async.mapValuesLimit(results, this.options.maxAsyncRequests,
async.mapValuesLimit(
results,
this.options.maxAsyncRequests,
(groups, coordinator, cb) => {
const broker = this.brokerForLeader(coordinator);
if (!broker || !broker.isConnected()) {
Expand All @@ -481,15 +488,24 @@ KafkaClient.prototype.getDescribeGroups = function (groups, callback) {
return callback(err);
}

callback(null, _.reduce(res, (result, describes, broker) => {
_.each(describes, (values, consumer) => {
result[consumer] = values;
result[consumer].brokerId = broker;
});
return result;
}, {}));
});
});
callback(
null,
_.reduce(
res,
(result, describes, broker) => {
_.each(describes, (values, consumer) => {
result[consumer] = values;
result[consumer].brokerId = broker;
});
return result;
},
{}
)
);
}
);
}
);
};

KafkaClient.prototype.close = function (callback) {
Expand Down

0 comments on commit bb7fde9

Please sign in to comment.