Skip to content

Commit

Permalink
Support for Kafka-Client versions 2.4.x and above (apache#176)
Browse files Browse the repository at this point in the history
**Fixes:**
 apache#166 apache#153 apache#99 

**Issue:**
KoP uses [Kafka-2.0.0](https://github.com/streamnative/kop/blob/78d9ba3487d4d7c85a5d667d45d9b38aaa7c824f/pom.xml#L46) which supports [API_VERSION's](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions) **0** --> **2**

When **_Kafka-Clients-2.4.x+_**(using `API_VERSION:  3`) connects to KoP, it panics and following error stack is observed:
`10:22:23.281 [pulsar-io-22-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error to get Response ByteBuf:
java.lang.IllegalArgumentException: Invalid version for API key API_VERSIONS: 3
    at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:312) ~[?:?]
    at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:286) ~[?:?]
    at org.apache.kafka.common.requests.ApiVersionsResponse.toStruct(ApiVersionsResponse.java:129) ~[?:?]
    at org.apache.kafka.common.requests.ResponseUtils.serializeResponse(ResponseUtils.java:40) ~[?:?]`


**Resolved By:**
Returning an `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes), which would notify the **_kafka-client_** to lower it's `API_VERSION`. As no list of `ApiKeys & versions` were available for the **kafka-clients** to refer, it safely falls-back to using `API_VERSION:  0` and KoP continues processing the kafka-messages using `API_VERSION:  0`.

**Tested producing/consuming with Kafka-Clients:**

> 2.0.0
2.2.2
2.3.1
2.4.0
2.4.1
2.5.0
2.5.1
2.6.0


**More...**
KoP could have returned the list of supported `ApiKeys & versions` while sending the `UNSUPPORTED_VERSION` error-code, which would have made the **_kafka-client_** select the **_latest_** supported `API_VERSION` and use `API_VERSION:  2` instead of falling all the way back and using `API_VERSION:  0` 


Notes on how **_Kafka-Brokers_** is supposed to handle this scenario: 
> 2. On receiving ApiVersionsRequest, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the ApiVersionRequest is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request.
> 
> 3. If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.


_Reference: [Kafka-Protocol Guide](https://kafka.apache.org/protocol.html#api_versions)_

We analyzed how various **_Kafka-Brokers_** respond to a similar situation where the **_kafka-client's_** `API_VERSION` is higher than what is supported by the **_Kafka-Broker_**.

![Kafka-Broker-Client-Wireshark-Results](https://user-images.githubusercontent.com/63665447/91243701-34d3a500-e710-11ea-9752-f9980333ce1d.png)
_Reference: Wireshark packet captures - [Kafka-Protocol-Study.zip](https://github.com/streamnative/kop/files/5127018/Kafka-Protocol-Study.zip)_

From the study we can infer that, in a similar `API_VERSION` mismatch scenario the **_Kafka-Brokers_** doesn't return the list of supported `ApiKeys & versions` when notifying the **_kafka-client_** with the `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes). Thus, forcing the **_kafka-clients_** to fall-back to using `API_VERSION:  0`.

To keep KoP working in sync with the **_Kafka-Broker_** working, we decided not to return the list of supported `ApiKeys & versions`.
  • Loading branch information
ptirupac-tibco committed Aug 27, 2020
1 parent 78d9ba3 commit 6192b10
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg,
protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
KafkaHeaderAndResponse.responseForRequest(request, response)) {

// Lowering Client API_VERSION request to the oldest API_VERSION KoP supports, this is to make \
// Kafka-Clients 2.4.x and above compatible and prevent KoP from panicking \
// when it comes across a higher API_VERSION.
short apiVersion = kafkaHeaderAndResponse.getApiVersion();
if (request.getHeader().apiKey() == API_VERSIONS){
if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
apiVersion = ApiKeys.API_VERSIONS.oldestVersion();
}
}
return ResponseUtils.serializeResponse(
kafkaHeaderAndResponse.getApiVersion(),
apiVersion,
kafkaHeaderAndResponse.getHeader(),
kafkaHeaderAndResponse.getResponse()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,31 +220,41 @@ protected void close() {

protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest,
CompletableFuture<AbstractResponse> resultFuture) {
AbstractResponse apiResponse = overloadDefaultApiVersionsResponse();
resultFuture.complete(apiResponse);
if (!ApiKeys.API_VERSIONS.isVersionSupported(apiVersionRequest.getHeader().apiVersion())) {
// Notify Client that API_VERSION is UNSUPPORTED.
AbstractResponse apiResponse = overloadDefaultApiVersionsResponse(true);
resultFuture.complete(apiResponse);
} else {
AbstractResponse apiResponse = overloadDefaultApiVersionsResponse(false);
resultFuture.complete(apiResponse);
}
}

protected ApiVersionsResponse overloadDefaultApiVersionsResponse() {
protected ApiVersionsResponse overloadDefaultApiVersionsResponse(boolean unsupportedApiVersion) {
List<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<>();
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) {
switch (apiKey) {
case FETCH:
// V4 added MessageSets responses. We need to make sure RecordBatch format is not used
versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4,
apiKey.latestVersion()));
break;
case LIST_OFFSETS:
// V0 is needed for librdkafka
versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0,
apiKey.latestVersion()));
break;
default:
versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
if (unsupportedApiVersion){
return new ApiVersionsResponse(0, Errors.UNSUPPORTED_VERSION, versionList);
} else {
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) {
switch (apiKey) {
case FETCH:
// V4 added MessageSets responses. We need to make sure RecordBatch format is not used
versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4,
apiKey.latestVersion()));
break;
case LIST_OFFSETS:
// V0 is needed for librdkafka
versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0,
apiKey.latestVersion()));
break;
default:
versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
}
}
}
return new ApiVersionsResponse(0, Errors.NONE, versionList);
}
return new ApiVersionsResponse(0, Errors.NONE, versionList);
}

protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
Expand Down

0 comments on commit 6192b10

Please sign in to comment.