Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix describe configs for multiple brokers (#1772) #1280

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 39 additions & 25 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -1379,46 +1379,60 @@ KafkaClient.prototype.describeConfigs = function (payload, callback) {
return callback(new Error('Client is not ready (describeConfigs)'));
}
let err;

// Broker resource requests must go to the specific node
// other requests can go to any node
const brokerResourceRequests = [];
const nonBrokerResourceRequests = [];

_.forEach(payload.resources, function (resource) {
if (resourceTypeMap[resource.resourceType] === undefined) {
err = new Error(`Unexpected resource type ${resource.resourceType} for resource ${resource.resourceName}`);
return false;
} else {
resource.resourceType = resourceTypeMap[resource.resourceType];
}

if (resource.resourceType === resourceTypeMap['broker']) {
brokerResourceRequests.push(resource);
} else {
nonBrokerResourceRequests.push(resource);
}
});

if (err) {
return callback(err);
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(
brokers,
this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)'));
}

const correlationId = this.nextId();

let apiVersion = 0;
if (broker.apiSupport && broker.apiSupport.describeConfigs) {
apiVersion = broker.apiSupport.describeConfigs.max;
async.parallelLimit([
(cb) => {
if (nonBrokerResourceRequests.length > 0) {
this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb);
} else {
cb(null, []);
}
apiVersion = Math.min(apiVersion, 2);
const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion);
this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb);
},
(err, results) => {
if (err) {
callback(err);
return;
}
results = _.values(results);
callback(null, _.merge.apply({}, results));
...brokerResourceRequests.map(r => {
return (cb) => {
this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb);
};
})
], this.options.maxAsyncRequests, (err, result) => {
if (err) {
return callback(err);
}
);

callback(null, _.flatten(result));
});
};

/**
* Sends a request to any broker in the cluster
*/
KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) {
// For now just select the first broker
const brokerId = Object.keys(this.brokerMetadata)[0];
this.sendRequestToBroker(brokerId, requestType, args, callback);
};

module.exports = KafkaClient;
32 changes: 27 additions & 5 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,19 @@ function decodeVersionsResponse (resp) {
return error || versions;
}

function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
function encodeDescribeConfigsRequest (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 0);
}

function encodeDescribeConfigsRequestV1 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 1);
}

function encodeDescribeConfigsRequestV2 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 2);
}

function _encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
let request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeConfigs, apiVersion);
const resources = payload.resources;
request.Int32BE(resources.length);
Expand All @@ -1692,10 +1704,16 @@ function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVers
return encodeRequestWithLength(request.make());
}

function decodeDescribeConfigsResponse (apiVersion) {
return function (resp) {
return _decodeDescribeConfigsResponse(resp, apiVersion);
};
function decodeDescribeConfigsResponse (resp) {
return _decodeDescribeConfigsResponse(resp, 0);
}

function decodeDescribeConfigsResponseV1 (resp) {
return _decodeDescribeConfigsResponse(resp, 1);
}

function decodeDescribeConfigsResponseV2 (resp) {
return _decodeDescribeConfigsResponse(resp, 2);
}

function _decodeDescribeConfigsResponse (resp, apiVersion) {
Expand Down Expand Up @@ -1858,4 +1876,8 @@ exports.decodeListGroups = decodeListGroups;
exports.encodeVersionsRequest = encodeVersionsRequest;
exports.decodeVersionsResponse = decodeVersionsResponse;
exports.encodeDescribeConfigsRequest = encodeDescribeConfigsRequest;
exports.encodeDescribeConfigsRequestV1 = encodeDescribeConfigsRequestV1;
exports.encodeDescribeConfigsRequestV2 = encodeDescribeConfigsRequestV2;
exports.decodeDescribeConfigsResponse = decodeDescribeConfigsResponse;
exports.decodeDescribeConfigsResponseV1 = decodeDescribeConfigsResponseV1;
exports.decodeDescribeConfigsResponseV2 = decodeDescribeConfigsResponseV2;
6 changes: 5 additions & 1 deletion lib/protocol/protocolVersions.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ const API_MAP = {
[p.encodeCreateTopicRequestV1, p.decodeCreateTopicResponseV1]
],
deleteTopics: null,
describeConfigs: [[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse]],
describeConfigs: [
[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse],
[p.encodeDescribeConfigsRequestV1, p.decodeDescribeConfigsResponseV1],
[p.encodeDescribeConfigsRequestV2, p.decodeDescribeConfigsResponseV2]
],
saslAuthenticate: [[p.encodeSaslAuthenticationRequest, p.decodeSaslAuthenticationResponse]]
};

Expand Down
2 changes: 1 addition & 1 deletion test/test.admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ describe('Admin', function () {
};
admin.describeConfigs(payload, function (error, res) {
should.not.exist(res);
error.should.have.property('message').and.containEql('Unexpected broker id');
error.should.have.property('message').and.containEql('No broker with id ' + brokerId);
done();
});
});
Expand Down