From c8f687ac1505456cb568de2b60df235eb1ceb5f0 Mon Sep 17 00:00:00 2001 From: Crispin Bernier Date: Tue, 31 Oct 2023 20:16:11 -0400 Subject: [PATCH] KAFKA-15661: KIP-951: protocol changes (#14627) Separating out the protocol changes from #14444 in an effort to more quickly unblock the client side PR. This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client Reviewers: Justine Olshan , Mayank Shekhar Narula --- .../common/message/FetchRequest.json | 4 +++- .../common/message/FetchResponse.json | 13 +++++++++- .../common/message/ProduceRequest.json | 4 +++- .../common/message/ProduceResponse.json | 24 ++++++++++++++++--- .../kafka/server/common/MetadataVersion.java | 4 +++- 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json index 295cbf3aa82f..4f1b7b17ad3c 100644 --- a/clients/src/main/resources/common/message/FetchRequest.json +++ b/clients/src/main/resources/common/message/FetchRequest.json @@ -53,7 +53,9 @@ // // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) - "validVersions": "0-15", + // + // Version 16 is the same as version 15 (KIP-951). + "validVersions": "0-16", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json index 366e702cfbd5..e5f49ba6fde9 100644 --- a/clients/src/main/resources/common/message/FetchResponse.json +++ b/clients/src/main/resources/common/message/FetchResponse.json @@ -45,7 +45,9 @@ // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405) // // Version 15 is the same as version 14 (KIP-903). - "validVersions": "0-15", + // + // Version 16 adds the 'NodeEndpoints' field (KIP-951). + "validVersions": "0-16", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, @@ -102,6 +104,15 @@ "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} ]} + ]}, + { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0, + "about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "16+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."}, + { "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." }, + { "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." }, + { "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null", + "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 82a168e63cc6..6b2d909ab843 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -33,7 +33,9 @@ // Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 is the same as version 9 (KIP-951). + "validVersions": "0-10", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 0c47f6d938e0..d294fb8aa2ef 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -32,7 +32,9 @@ // records that cause the whole batch to be dropped. See KIP-467 for details. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951) + "validVersions": "0-10", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", @@ -59,10 +61,26 @@ "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, - "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"} + "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}, + { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "10+", "taggedVersions": "10+", "tag": 0, "fields": [ + { "name": "LeaderId", "type": "int32", "versions": "10+", "default": "-1", "entityType": "brokerId", + "about": "The ID of the current leader or -1 if the leader is unknown."}, + { "name": "LeaderEpoch", "type": "int32", "versions": "10+", "default": "-1", "about": "The latest known leader epoch"} + ]} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0", - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0, + "about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "10+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."}, + { "name": "Host", "type": "string", "versions": "10+", + "about": "The node's hostname." }, + { "name": "Port", "type": "int32", "versions": "10+", + "about": "The node's port." }, + { "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null", + "about": "The rack of the node, or null if it has not been assigned to a rack." } + ]} ] } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 8f22ed582a6c..ee3c3fdd23c4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -360,7 +360,9 @@ public short partitionRecordVersion() { } public short fetchRequestVersion() { - if (this.isAtLeast(IBP_3_5_IV1)) { + if (this.isAtLeast(IBP_3_7_IV0)) { + return 16; + } else if (this.isAtLeast(IBP_3_5_IV1)) { return 15; } else if (this.isAtLeast(IBP_3_5_IV0)) { return 14;