Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17011: SupportedFeatures.MinVersion incorrectly blocks v0 (3.8) #16420

Closed
wants to merge 5 commits into from

Conversation

jolshan
Copy link
Contributor

@jolshan jolshan commented Jun 21, 2024

For 3.8 we can't implement a new ApiVersions bump, so for now we will omit the feature in the response in order to be backwards compatible.

Without this change, when interacting with an old version of kafka, we see the following error:

java.lang.IllegalArgumentException: Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received minValue: 0, maxValue: 0
    at org.apache.kafka.common.feature.BaseVersionRange.<init>(BaseVersionRange.java:65)
    at org.apache.kafka.common.feature.SupportedVersionRange.<init>(SupportedVersionRange.java:32)
    at org.apache.kafka.clients.NodeApiVersions.<init>(NodeApiVersions.java:114)
    at org.apache.kafka.clients.NetworkClient.handleApiVersionsResponse(NetworkClient.java:960)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:927)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:580)
    at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:78)
    at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:98)
    at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)

@jolshan
Copy link
Contributor Author

jolshan commented Jun 21, 2024

Ran the kraft upgrade tests and they passed with this change 👍

@jolshan
Copy link
Contributor Author

jolshan commented Jun 21, 2024

I ran some group coordinator tests as well, and those are also passing.

@jlprat jlprat added the Blocker This pull request is identified as solving a blocker for a release. label Jun 24, 2024
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan thanks for this quick fix. two trivial comments are left. PTAL

Features<SupportedVersionRange> backwardsCompatibleFeatures = Features.supportedFeatures(latestSupportedFeatures.features().entrySet()
.stream().filter(entry -> {
SupportedVersionRange supportedVersionRange = entry.getValue();
return supportedVersionRange.min() != 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether I fail to catch your description.

so for now we will change 0 to 1 in the response in order to be backwards compatible.

the code looks like it gets rid of "0" instead of changing to from 0 to 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another, does this PR mean Admin#describeFeatures can't see the feature "group.version" from the broker running in 3.8.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey sorry, the comment was from the old implementation, the new way is to omit as you mentioned.
I believe describeFeatures will not be able to show the version for this release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's useful to add a comment on why we are filtering version 0 features.

@chia7712
Copy link
Contributor

BTW, there is a description in KIP-584

Each broker’s supported dictionary of {feature → version range} will be defined in the broker code. For each supported feature, the supported version range is defined by a min_version (an int64 starting always from 1) and max_version (an int64 >=1 and >= min_version).

not sure why #15671 changed the start version from 1 to 0

https://github.com/apache/kafka/pull/15671/files#r1576879728

@jolshan
Copy link
Contributor Author

jolshan commented Jun 24, 2024

@chia7712 It was incorrect to set at 1 because we can not assume 0 level is supported. However, 0 level -- the feature being disabled is valid.

@chia7712
Copy link
Contributor

@jolshan I feel FeatureCommandTest and ApiVersionsRequestTest have some related failed tests

@jolshan
Copy link
Contributor Author

jolshan commented Jun 24, 2024

Thanks @chia7712 I will take a look.

@jolshan
Copy link
Contributor Author

jolshan commented Jun 25, 2024

Cleaned up the test failures. The new ones look unrelated. 👍

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712
Copy link
Contributor

It was incorrect to set at 1 because we can not assume 0 level is supported. However, 0 level -- the feature being disabled is valid.

@jolshan thanks for explanation. Also, I have saw the discussion on the mail channel (https://lists.apache.org/thread/7wx3j788ztf1vl7xm9ryt8cwl3tk9lw3). Both are good resources to learn the root cause of this issue :)

@chia7712
Copy link
Contributor

@jolshan sorry that I have another question:

it seems the hotfix for 3.8 is to avoid propagating the min_version=0, so we remove the min_version=0 in generating ApiVersionsResponse. That is good, and for another: should we do similar guard for BrokerRegistrationRequest? Is it a potential bug that a "new" broker tries to register to "old" quorum controller which can't accept min_version=0?

@jolshan
Copy link
Contributor Author

jolshan commented Jun 25, 2024

That is good, and for another: should we do similar guard for BrokerRegistrationRequest? Is it a potential bug that a "new" broker tries to register to "old" quorum controller which can't accept min_version=0?

Hmmm. I believe the handling is ok with 0. The issue is not with 0 itself. (We can handle controllers that don't have the same range of supported versions correctly.) The issue was with SupportedVersionRange which only seems to be used by NodeApiVersions.

@cmccabe correct me if I'm wrong.

@jsancio
Copy link
Member

jsancio commented Jun 25, 2024

That is good, and for another: should we do similar guard for BrokerRegistrationRequest? Is it a potential bug that a "new" broker tries to register to "old" quorum controller which can't accept min_version=0?

@chia7712 the problem is specific to ApiVersionResponse. The deserializer for that RPC response doesn't allow 0 values for min supported version or max supported version. Broker registration doesn't have this problem.

@jolshan
Copy link
Contributor Author

jolshan commented Jun 25, 2024

I will merge in the next hour or so unless there are further concerns.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the PR. Just a minor comment. It will also be useful to include the exception caused by this issue in the jira or PR description.

Features<SupportedVersionRange> backwardsCompatibleFeatures = Features.supportedFeatures(latestSupportedFeatures.features().entrySet()
.stream().filter(entry -> {
SupportedVersionRange supportedVersionRange = entry.getValue();
return supportedVersionRange.min() != 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's useful to add a comment on why we are filtering version 0 features.

@jolshan
Copy link
Contributor Author

jolshan commented Jun 25, 2024

Thanks @junrao! Done

@chia7712
Copy link
Contributor

@jolshan @jsancio thanks for your response. My concern was the "min version" of broker registration is used to create SupportedVersionRange. The one use case I noticed is shown below.

name -> new SupportedVersionRange(range.min(), range.max())

That is used in migration, so not sure whether it will be a problem. For example, the quorum controller is running with defective SupportedVersionRange. After controller migration, one zk broker is rolling-upgrade to kraft mode with 3.8.0, and it will send BrokerRegistrationRequest with "group.version, minSupportedVersion=0". Hence, the quorum controller is possible to use the "min version=0" to build SupportedVersionRange ... ?

@jolshan
Copy link
Contributor Author

jolshan commented Jun 25, 2024

@chia7712 The new code of SupportedVersionRange does support 0 version. The only issue is when a new broker sends a request to an older one.

Is it correct that in order to encounter this issue, we would need to have an migration ongoing at the same time as a version rolling upgrade? Is this typical for a kraft migration?

If there are tests that can confirm migration is safe, I'm also happy to run and double check.

@cmccabe
Copy link
Contributor

cmccabe commented Jun 25, 2024

It is not possible to do an inter.broker.protocol / MetadataVersion change during ZK-to-KRaft migration. The code has numerous checks that prevent anyone from doing this.

More importantly, ApiVersionsResponse.SupportedFeatures is not used in ZK migration, and never has been. And the QuorumController never had the buggy code which made feature ranges containing 0 fail. It's fine for the broker to register with a supported range including 0, both before and after this PR. The broker sets its features by filling out a BrokerRegistrationRequest, not by anything it does with ApiVersionsRequest.

@chia7712
Copy link
Contributor

@jolshan @cmccabe thanks for the confirmation. Learn a lot from you 😄

@chia7712
Copy link
Contributor

That may be not a common case, but the following error can be produced in migrating zk to kraft.

[2024-06-26 15:26:26,397] ERROR Encountered zk migration fault: Unhandled error in MetadataChangeEvent (org.apache.kafka.server.fault.LoggingFaultHandler)
java.lang.IllegalArgumentException: Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received minValue: 0, maxValue: 1
	at org.apache.kafka.common.feature.BaseVersionRange.<init>(BaseVersionRange.java:65)
	at org.apache.kafka.common.feature.SupportedVersionRange.<init>(SupportedVersionRange.java:32)
	at kafka.cluster.Broker$.$anonfun$supportedFeatures$1(Broker.scala:49)
	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
	at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:321)
	at kafka.cluster.Broker$.supportedFeatures(Broker.scala:48)
	at kafka.cluster.Broker$.fromBrokerRegistration(Broker.scala:58)
	at kafka.migration.MigrationControllerChannelContext.$anonfun$liveOrShuttingDownBrokers$1(MigrationControllerChannelContext.scala:78)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
	at scala.collection.immutable.List.prependedAll(List.scala:153)
	at scala.collection.immutable.List$.from(List.scala:684)
	at scala.collection.immutable.List$.from(List.scala:681)
	at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
	at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
	at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
	at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
	at scala.collection.IterableOps.map(Iterable.scala:682)
	at scala.collection.IterableOps.map$(Iterable.scala:682)
	at scala.collection.AbstractIterable.map(Iterable.scala:933)
	at kafka.migration.MigrationControllerChannelContext.<init>(MigrationControllerChannelContext.scala:77)
	at kafka.migration.MigrationPropagator.metadataProvider(MigrationPropagator.scala:78)
	at kafka.migration.MigrationPropagator.$anonfun$requestBatch$1(MigrationPropagator.scala:71)
	at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:399)
	at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataDelta(MigrationPropagator.scala:117)
	at org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:559)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
	at java.base/java.lang.Thread.run(Thread.java:1583)

steps:

  1. create zk broker cluster with this patch (3.8 branch)
  2. create quorum controller with 3.6.2
  3. migrate zk broker to kraft with IBP=3.6 and unstable.feature (in order to enable group.version to produce min=0)

@chia7712
Copy link
Contributor

That may be not a common case

Producing the error needs to enable unstable.feature, so I feel that is not a common use case.

@jolshan
Copy link
Contributor Author

jolshan commented Jun 26, 2024

migrate zk broker to kraft with IBP=3.6 and unstable.feature

I see that the supported range is 0-1 in that case. Is it really the case you need to set that flag?

@chia7712
Copy link
Contributor

chia7712 commented Jun 26, 2024

just to clarify: this is not a real case I have encountered. I just be over-engineering today 😄

Is it really the case you need to set that flag?

IBP=3.6 -> avoid UNSUPPORTED_VERSION
unstable.feature.versions.enable=true -> make kraft broker send broker registration with group.version=1

@cmccabe
Copy link
Contributor

cmccabe commented Jun 26, 2024

unstable feature versions should never be enabled outside JUnit tests (I have a KIP which formalized this). Exactly to avoid this kind of confusion.

Copy link
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712
Copy link
Contributor

unstable feature versions should never be enabled outside JUnit tests (I have a KIP which formalized this). Exactly to avoid this kind of confusion.

Sorry that I should not use unstable.feature.versions.enable which can mislead this discussion. The broker registration encounters the issue of BaseVersionRange (min version=0) even though unstable.feature.versions.enable is not defined (see following error stack).

[2024-06-26 20:54:40,169] ERROR Encountered zk migration fault: Unhandled error in MetadataChangeEvent (org.apache.kafka.server.fault.LoggingFaultHandler)
java.lang.IllegalArgumentException: Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received minValue: 0, maxValue: 0
	at org.apache.kafka.common.feature.BaseVersionRange.<init>(BaseVersionRange.java:65)
	at org.apache.kafka.common.feature.SupportedVersionRange.<init>(SupportedVersionRange.java:32)
	at kafka.cluster.Broker$.$anonfun$supportedFeatures$1(Broker.scala:49)
	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
	at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:321)
	at kafka.cluster.Broker$.supportedFeatures(Broker.scala:48)
	at kafka.cluster.Broker$.fromBrokerRegistration(Broker.scala:58)
	at kafka.migration.MigrationControllerChannelContext.$anonfun$liveOrShuttingDownBrokers$1(MigrationControllerChannelContext.scala:78)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
	at scala.collection.immutable.List.prependedAll(List.scala:153)
	at scala.collection.immutable.List$.from(List.scala:684)
	at scala.collection.immutable.List$.from(List.scala:681)
	at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
	at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
	at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
	at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
	at scala.collection.IterableOps.map(Iterable.scala:682)
	at scala.collection.IterableOps.map$(Iterable.scala:682)
	at scala.collection.AbstractIterable.map(Iterable.scala:933)
	at kafka.migration.MigrationControllerChannelContext.<init>(MigrationControllerChannelContext.scala:77)
	at kafka.migration.MigrationPropagator.metadataProvider(MigrationPropagator.scala:78)
	at kafka.migration.MigrationPropagator.$anonfun$requestBatch$1(MigrationPropagator.scala:71)
	at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:399)
	at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataDelta(MigrationPropagator.scala:117)
	at org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:559)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
	at java.base/java.lang.Thread.run(Thread.java:1583)

@jolshan
Copy link
Contributor Author

jolshan commented Jun 26, 2024

@chia7712 Yeah, that's what I meant. I'm considering some options to fix this so that we don't see the issue during migration. I won't merge until we come to a good solution. We may just have to revert the group.version change for this release. I need to confirm with @dajac

@cmccabe
Copy link
Contributor

cmccabe commented Jun 26, 2024

@chia7712 @jolshan : Yes, after some offline discussion I now understand what you were trying to say. I guess the bad case is:

  1. new broker registers with old controller.
  2. old controller attempts to send out registration from #1 to brokers still in zk mode.
  3. serde path crashes because of 0-1 supported range in the broker's registration.

Just as with ApiVersionsResponse, we can probably fix this with a new RPC version for BrokerRegistrationRequest. I will add that to my 3.9 PR.

@dajac
Copy link
Contributor

dajac commented Jun 27, 2024

@chia7712 Yeah, that's what I meant. I'm considering some options to fix this so that we don't see the issue during migration. I won't merge until we come to a good solution. We may just have to revert the group.version change for this release. I need to confirm with @dajac

Reverting the group.version feature flag seems to be the best option and we can live with going back to the static config for the preview in 3.8. I wonder if we should also revert it in trunk in order to not have it in 3.9. Then, we can bring it back for 4.0 when we GA the feature.

@dajac
Copy link
Contributor

dajac commented Jun 27, 2024

unstable feature versions should never be enabled outside JUnit tests (I have a KIP which formalized this). Exactly to avoid this kind of confusion.

I think that it would be nice to have a public config to run the broker in experimental mode in order to test non-production features in development clusters. Otherwise, I suppose that we can achieve this by manually upgrading the MV to an unreleased version with kafka-features or do we disallow setting a non-production MV?

@dajac
Copy link
Contributor

dajac commented Jun 27, 2024

Discussed offline with a few folks and reverting group.version seems to be the way to go. I will prepare the PR as soon as possible. cc @jlprat

@jlprat
Copy link
Contributor

jlprat commented Jun 27, 2024

Thanks @dajac !

@jolshan
Copy link
Contributor Author

jolshan commented Jun 27, 2024

I will close this PR.

@jolshan jolshan closed this Jun 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release.
Projects
None yet
7 participants