From 1cd396783cc1075e221a6ec72f678640e0809313 Mon Sep 17 00:00:00 2001 From: Justine Date: Fri, 21 Jun 2024 13:16:28 -0700 Subject: [PATCH 1/5] Do not return 0 in apiversons response --- .../common/requests/ApiVersionsResponse.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 04ee2014d1d4..ec5c431ebd0b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * Possible error codes: @@ -258,11 +259,23 @@ private static ApiVersionsResponseData createApiVersionsResponseData( final long finalizedFeaturesEpoch, final boolean zkMigrationEnabled ) { + Features backwardsCompatibleFeatures = Features.supportedFeatures(latestSupportedFeatures.features().entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey(), + entry -> { + short newMin = entry.getValue().min() == 0 ? 1 : entry.getValue().min(); + short newMax = entry.getValue().max() == 0 ? 1 : entry.getValue().max(); + return new SupportedVersionRange(newMin, newMax); + } + )) + ); + final ApiVersionsResponseData data = new ApiVersionsResponseData(); data.setThrottleTimeMs(throttleTimeMs); data.setErrorCode(error.code()); data.setApiKeys(apiKeys); - data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures)); + data.setSupportedFeatures(createSupportedFeatureKeys(backwardsCompatibleFeatures)); data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures)); data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch); data.setZkMigrationReady(zkMigrationEnabled); From 38a8b072ff98dddb668dc5453fd9aeecc35b8418 Mon Sep 17 00:00:00 2001 From: Justine Date: Fri, 21 Jun 2024 14:15:38 -0700 Subject: [PATCH 2/5] Add unit test --- .../requests/ApiVersionsResponseTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index cf832050f2a5..7657a640bc9c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiMessageType.ListenerType; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey; @@ -275,6 +276,27 @@ public void testIntersect() { assertEquals(expected, ApiVersionsResponse.intersect(other, thisVersion).get()); } + @Test + public void testZeroVersionNotReturned() { + String featureName = "test.feature.version"; + ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( + 10, + RecordVersion.V1, + Features.supportedFeatures(Collections.singletonMap(featureName, new SupportedVersionRange((short) 0, (short) 0))), + Collections.emptyMap(), + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + null, + ListenerType.BROKER, + true, + false, + false + ); + + ApiVersionsResponseData.SupportedFeatureKey feature = response.data().supportedFeatures().find(featureName); + assertEquals(1, feature.maxVersion()); + assertEquals(1, feature.minVersion()); + } + private void verifyVersions(short forwardableAPIKey, short minVersion, short maxVersion, From 6b897b43cbf86418d873e24fc9c5ac55f182c555 Mon Sep 17 00:00:00 2001 From: Justine Date: Fri, 21 Jun 2024 17:23:06 -0700 Subject: [PATCH 3/5] Suppress 0 versions for older versions --- .../kafka/common/requests/ApiVersionsResponse.java | 11 +++++------ .../common/requests/ApiVersionsResponseTest.java | 10 ++++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index ec5c431ebd0b..cf9cb6b4bdf2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -260,14 +260,13 @@ private static ApiVersionsResponseData createApiVersionsResponseData( final boolean zkMigrationEnabled ) { Features backwardsCompatibleFeatures = Features.supportedFeatures(latestSupportedFeatures.features().entrySet() - .stream() + .stream().filter(entry -> { + SupportedVersionRange supportedVersionRange = entry.getValue(); + return supportedVersionRange.min() != 0; + }) .collect(Collectors.toMap( entry -> entry.getKey(), - entry -> { - short newMin = entry.getValue().min() == 0 ? 1 : entry.getValue().min(); - short newMax = entry.getValue().max() == 0 ? 1 : entry.getValue().max(); - return new SupportedVersionRange(newMin, newMax); - } + entry -> entry.getValue() )) ); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 7657a640bc9c..7776aa34945a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiMessageType.ListenerType; -import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey; @@ -282,19 +281,18 @@ public void testZeroVersionNotReturned() { ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( 10, RecordVersion.V1, - Features.supportedFeatures(Collections.singletonMap(featureName, new SupportedVersionRange((short) 0, (short) 0))), + Features.supportedFeatures(Collections.singletonMap(featureName, new SupportedVersionRange((short) 0, (short) 1))), Collections.emptyMap(), ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, null, ListenerType.BROKER, true, false, - false + true ); - ApiVersionsResponseData.SupportedFeatureKey feature = response.data().supportedFeatures().find(featureName); - assertEquals(1, feature.maxVersion()); - assertEquals(1, feature.minVersion()); + // Feature should not be in the supported features due to the 0 version. + assertEquals(null, response.data().supportedFeatures().find(featureName)); } private void verifyVersions(short forwardableAPIKey, From 58a9cfccaa9c7191c50049bb2a06c5bfc171e653 Mon Sep 17 00:00:00 2001 From: Justine Date: Mon, 24 Jun 2024 11:06:51 -0700 Subject: [PATCH 4/5] test fixes --- .../kafka/server/AbstractApiVersionsRequestTest.scala | 8 +++----- .../org/apache/kafka/tools/FeatureCommandTest.java | 10 ++-------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index b79cf12aa4b3..482d76e5f937 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record.RecordVersion import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{GroupVersion, MetadataVersion} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag @@ -69,12 +69,10 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(2, apiVersionsResponse.data().supportedFeatures().size()) + // Since the min version of group version is 0, it is not included in the response. + assertEquals(1, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion()) - - assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion()) - assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (!cluster.isKRaftTest) { ApiVersionsResponse.collectApis( diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index b6d4cea0edbc..f90903b61504 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -64,12 +64,9 @@ public void testDescribeWithKRaft(ClusterInstance cluster) { List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); - assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); - // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1))); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(0))); } @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4) @@ -80,12 +77,9 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); - assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); - // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(features.get(1))); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(features.get(0))); } @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) From 1faf2abc69803dd542b60727cbf89b1aa8f5cee7 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 25 Jun 2024 11:44:16 -0700 Subject: [PATCH 5/5] Add comment --- .../org/apache/kafka/common/requests/ApiVersionsResponse.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index cf9cb6b4bdf2..0df15382a96d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -259,6 +259,9 @@ private static ApiVersionsResponseData createApiVersionsResponseData( final long finalizedFeaturesEpoch, final boolean zkMigrationEnabled ) { + // Older versions do not support 0 version in SupportedVersionRange. If an older broker receives a response with a zero version, it will fail with + // IllegalArgumentException. In order to not block upgrades, filter out features with 0 versions from the response. + // Future versions will bump the ApiVersionsRequest to handle this case and return features with version 0, but that version bump is not in 3.8. Features backwardsCompatibleFeatures = Features.supportedFeatures(latestSupportedFeatures.features().entrySet() .stream().filter(entry -> { SupportedVersionRange supportedVersionRange = entry.getValue();