Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15661: KIP-951: protocol changes #14627

Merged
merged 5 commits into from Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchRequest.json
Expand Up @@ -53,7 +53,9 @@
//
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also,
// deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
"validVersions": "0-15",
//
// Version 16 is the same as version 15 (KIP-951).
"validVersions": "0-16",
jolshan marked this conversation as resolved.
Show resolved Hide resolved
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
Expand Down
13 changes: 12 additions & 1 deletion clients/src/main/resources/common/message/FetchResponse.json
Expand Up @@ -45,7 +45,9 @@
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
//
// Version 15 is the same as version 14 (KIP-903).
"validVersions": "0-15",
//
// Version 16 adds the 'NodeEndpoints' field (KIP-951).
"validVersions": "0-16",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
Expand Down Expand Up @@ -102,6 +104,15 @@
"about": "The preferred read replica for the consumer to use on its next fetch request"},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not reuse the tags as I mentioned in the other PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied to this in the other PR #14444 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like tags are scoped to the list level so this isn't really the same tag. They also need to be contiguous within their scope so this gives an error if I try to tag NodeEndpoints to something other than 0

Sharing the comment here for clarity.

"about": "Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "16+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
{ "name": "Host", "type": "string", "versions": "16+", "about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "16+", "about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "16+", "nullableVersions": "16+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}
Expand Up @@ -33,7 +33,9 @@
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
//
// Version 10 is the same as version 9 (KIP-951).
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
Expand Down
24 changes: 21 additions & 3 deletions clients/src/main/resources/common/message/ProduceResponse.json
Expand Up @@ -32,7 +32,9 @@
// records that cause the whole batch to be dropped. See KIP-467 for details.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
//
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
Expand All @@ -59,10 +61,26 @@
"about": "The error message of the record that caused the batch to be dropped"}
]},
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}
"about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"},
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "10+", "taggedVersions": "10+", "tag": 0, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "10+", "default": "-1", "entityType": "brokerId",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "10+", "default": "-1", "about": "The latest known leader epoch"}
]}
]}
]},
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0,
"about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "10+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
{ "name": "Host", "type": "string", "versions": "10+",
"about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "10+",
"about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
]}
]
}
Expand Up @@ -360,7 +360,9 @@ public short partitionRecordVersion() {
}

public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_5_IV1)) {
if (this.isAtLeast(IBP_3_7_IV0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work because all the new fields are tagged (so no real changes in handling)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is supposed to be the first approach mentioned in this comment #14444 (comment)

My understanding is clusters would first be upgraded to 3.7 and then the IBP would be bumped after all the brokers are upgraded, but please correct me if that's wrong. You're right though, all the fields are tagged and there's no change in handling on the broker side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to whether the code here that we want to merge will break the build if we haven't implemented the new fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah we shouldn't need any other changes, the fields are all tagged and not getting used anywhere so we can leave them as their default values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that if we set the version as unstable, we may not be able to use it here. 🤦‍♀️ Maybe we should remove the unstable version true if this causes issues in tests.

Sorry for confusion.

return 16;
} else if (this.isAtLeast(IBP_3_5_IV1)) {
return 15;
} else if (this.isAtLeast(IBP_3_5_IV0)) {
return 14;
Expand Down