Skip to content

Commit

Permalink
KAFKA-15661: KIP-951: protocol changes (apache#14627)
Browse files Browse the repository at this point in the history
Separating out the protocol changes from apache#14444 in an effort to more quickly unblock the client side PR.

This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client

Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
  • Loading branch information
chb2ab authored and AnatolyPopov committed Feb 16, 2024
1 parent c466297 commit beb0c5c
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchRequest.json
Expand Up @@ -53,7 +53,9 @@
//
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
"validVersions": "0-15",
//
// Version 16 is the same as version 15 (KIP-951).
"validVersions": "0-16",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
Expand Down
13 changes: 12 additions & 1 deletion clients/src/main/resources/common/message/FetchResponse.json
Expand Up @@ -45,7 +45,9 @@
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
//
// Version 15 is the same as version 14 (KIP-903).
"validVersions": "0-15",
//
// Version 16 adds the 'NodeEndpoints' field (KIP-951).
"validVersions": "0-16",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
Expand Down Expand Up @@ -102,6 +104,15 @@
"about": "The preferred read replica for the consumer to use on its next fetch request"},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
"about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "16+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
{ "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}
Expand Up @@ -33,7 +33,9 @@
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
//
// Version 10 is the same as version 9 (KIP-951).
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
Expand Down
24 changes: 21 additions & 3 deletions clients/src/main/resources/common/message/ProduceResponse.json
Expand Up @@ -32,7 +32,9 @@
// records that cause the whole batch to be dropped. See KIP-467 for details.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
//
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
Expand All @@ -59,10 +61,26 @@
"about": "The error message of the record that caused the batch to be dropped"}
]},
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"},
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "10+", "taggedVersions": "10+", "tag": 0, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "10+", "default": "-1", "entityType": "brokerId",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "10+", "default": "-1", "about": "The latest known leader epoch"}
]}
]}
]},
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0,
"about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "10+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
{ "name": "Host", "type": "string", "versions": "10+",
"about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "10+",
"about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}
Expand Up @@ -360,7 +360,9 @@ public short partitionRecordVersion() {
}

public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_5_IV1)) {
if (this.isAtLeast(IBP_3_7_IV0)) {
return 16;
} else if (this.isAtLeast(IBP_3_5_IV1)) {
return 15;
} else if (this.isAtLeast(IBP_3_5_IV0)) {
return 14;
Expand Down

0 comments on commit beb0c5c

Please sign in to comment.