Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16308 [1/N]: Create FeatureVersion interface and add --feature flag and handling to StorageTool #15685

Merged
merged 34 commits into from
May 29, 2024

Conversation

jolshan
Copy link
Contributor

@jolshan jolshan commented Apr 8, 2024

As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

I also tested setting the test.feature.version locally
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties --feature test.feature.version=1

I see the following in logs:
[2024-04-17 16:04:10,988] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting feature test.feature.version to 1 (org.apache.kafka.controller.FeatureControlManager)

When I queried ApiVersions:

Feature: metadata.version	SupportedMinVersion: 3.0-IV1	SupportedMaxVersion: 3.7-IV4	FinalizedVersionLevel: 3.7-IV4	Epoch: 49882
Feature: test.feature.version	SupportedMinVersion: 0	SupportedMaxVersion: 1	FinalizedVersionLevel: 1	Epoch: 49882

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jolshan jolshan marked this pull request as ready for review April 15, 2024 18:01
@jolshan
Copy link
Contributor Author

jolshan commented Apr 18, 2024

I noticed I need to do Quorum and Broker features which are basically the same implementation. Stay tuned.

@jolshan
Copy link
Contributor Author

jolshan commented Apr 18, 2024

I may need to figure out how to deal with this...when setting the version to 0.

Replayed a FeatureLevelRecord removing feature test.feature.version
Broker 1 registered with feature test.feature.version that is unknown to the controller (org.apache.kafka.controller.ClusterControlManager)
[2024-04-18 15:32:25,440] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=pihYtx_qSSKlJ2K31eGFOw, brokerEpoch=8, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[T1fHZ5DjOC1Dk5CviOHPbg]) (org.apache.kafka.controller.ClusterControlManager)

@jolshan
Copy link
Contributor Author

jolshan commented Apr 25, 2024

I've cleaned up the code to not set the record in the storage tool when the version is 0. I also cleaned up the log since it is not always the case that the controller doesn't know the version. For now, 0 is a reasonable default.

Here are the controller logs when i set the test version to 0 and to 1.

[2024-04-24 17:15:05,566] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:15:05,568] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:15:05,570] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1. (org.apache.kafka.controller.OffsetControlManager)
[2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting metadata.version to 3.7-IV4 (org.apache.kafka.controller.FeatureControlManager)
[2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed EndTransactionRecord() at offset 4. (org.apache.kafka.controller.OffsetControlManager)
[2024-04-24 17:15:05,645] INFO [QuorumController id=1] Replayed RegisterControllerRecord contaning ControllerRegistration(id=1, incarnationId=h3WYlEEtTUCG6nOjFnIQxQ, zkMigrationReady=false, listeners=[Endpoint(listenerName='CONTROLLER', securityProtocol=PLAINTEXT, host='10.200.4.27', port=9093)], supportedFeatures={metadata.version: 1-19, test.feature.version: 0-1}). (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:15:05,686] INFO [QuorumController id=1] Replayed initial RegisterBrokerRecord for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=6, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:15:05,745] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=7, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:15:05,786] INFO [QuorumController id=1] The request from broker 1 to unfence has been granted because it has caught up with the offset of its register broker record 7. (org.apache.kafka.controller.BrokerHeartbeatManager)
[2024-04-24 17:15:05,788] INFO [QuorumController id=1] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 1: BrokerRegistrationChangeRecord(brokerId=1, brokerEpoch=7, fenced=-1, inControlledShutdown=0, logDirs=[]) (org.apache.kafka.controller.ClusterControlManager)
24-04-24 17:16:29,048] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:16:29,050] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:16:29,051] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 2 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController)
[2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1. (org.apache.kafka.controller.OffsetControlManager)
[2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting metadata.version to 3.7-IV4 (org.apache.kafka.controller.FeatureControlManager)
[2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting feature test.feature.version to 1 (org.apache.kafka.controller.FeatureControlManager)
[2024-04-24 17:16:29,053] INFO [QuorumController id=1] Replayed EndTransactionRecord() at offset 5. (org.apache.kafka.controller.OffsetControlManager)
[2024-04-24 17:16:29,123] INFO [QuorumController id=1] Replayed RegisterControllerRecord contaning ControllerRegistration(id=1, incarnationId=K_kgrPTRRlKZDq56_5dA6w, zkMigrationReady=false, listeners=[Endpoint(listenerName='CONTROLLER', securityProtocol=PLAINTEXT, host='10.200.4.27', port=9093)], supportedFeatures={metadata.version: 1-19, test.feature.version: 0-1}). (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:16:29,169] INFO [QuorumController id=1] Replayed initial RegisterBrokerRecord for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=yfPvBsa3QhqbDwIPuyvc3A, brokerEpoch=7, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[hIbM7AYPZPeiH8k53crBMg]) (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:16:29,214] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=yfPvBsa3QhqbDwIPuyvc3A, brokerEpoch=8, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[hIbM7AYPZPeiH8k53crBMg]) (org.apache.kafka.controller.ClusterControlManager)
[2024-04-24 17:16:29,256] INFO [QuorumController id=1] The request from broker 1 to unfence has been granted because it has caught up with the offset of its register broker record 8. (org.apache.kafka.controller.BrokerHeartbeatManager)
[2024-04-24 17:16:29,259] INFO [QuorumController id=1] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 1: BrokerRegistrationChangeRecord(brokerId=1, brokerEpoch=8, fenced=-1, inControlledShutdown=0, logDirs=[]) (org.apache.kafka.controller.ClusterControlManager)

@jolshan
Copy link
Contributor Author

jolshan commented Apr 25, 2024

Separately I need to fix the update tool. It will always say the finalized version is 0 if the tool knows of the feature, even if the broker doesn't include it in the list of finalized features.

Feature: metadata.version	SupportedMinVersion: 3.0-IV1	SupportedMaxVersion: 3.7-IV4	FinalizedVersionLevel: 3.7-IV4	Epoch: 17
Feature: test.feature.version	SupportedMinVersion: 0	SupportedMaxVersion: 1	FinalizedVersionLevel: 0	Epoch: 17
"supportedFeatures":[{"name":"metadata.version","minVersion":1,"maxVersion":19},{"name":"test.feature.version","minVersion":0,"maxVersion":1}],"finalizedFeaturesEpoch":17,"finalizedFeatures":[{"name":"metadata.version","maxVersionLevel":19,"minVersionLevel":19}]}

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan Thanks for the patch. I made a first pass on it and left some questions/comments to start with.

* The next metadata version to be released when the feature became production ready.
* (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18)
*/
MetadataVersion metadataVersionMapping();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this one correctly, this is basically the minimum required metadata version for this feature version to be allowed. Is my understanding correct? If it is, it may be better to call it minimumRequiredMetadataVersion or something like this.

My second question is about the production readiness for a feature. My understanding is that a given version won't be allowed unless the MV is set to the right version. So if the MV is "unreleased" and we allow it via the configuration, then I can set a feature version depending on it. Is my understanding correct?

Copy link
Contributor Author

@jolshan jolshan May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is not the minimum version. This is the first image version that supports the feature.
I wanted to have a mapping to image version, but that was heavily argued against on the mailing thread so I have to use MV as a proxy.

This value is only a DEFAULT if no value is set during bootstrapping but MV is set.

As long at the feature is in the image, you can set it regardless of the MV set (production ready or not.) Setting the version is completely independent from the MV set.

core/src/main/scala/kafka/tools/StorageTool.scala Outdated Show resolved Hide resolved
// As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case
// an intended feature is not being set.
if (finalized == 0)
log.debug("Broker {} registered with feature {} that is either unknown or version 0 on the controller",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we should just keep it as a warning. Or could we differentiate between a disabled feature or a unknown one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really hard to differentiate between disabled and unknown because the protocol when setting a feature to 0 is to remove it. You will not have a record when you set it to 0 because of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can validate the feature vs. Features.PRODUCTION_FEATURES, if it's not present there, we can log a warning.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan Thanks for the update and for renaming those classes. Overall, it looks pretty good to me. I left a few nits/questions.

Comment on lines 36 to 40
/**
* The next metadata version to be released when the feature became production ready.
* (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18)
*/
MetadataVersion metadataVersionMapping();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still confused by this one. Based on our offline discussion, my understanding is that this is only used during bootstrapping. We should try to make this clear in the name and in the javadoc.

(Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18)

For my understanding, why do we require to be the next one? Requiring the current seems more natural but I may be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like the mapping to remain consistent for MV. If we do the current mapping, some versions/images with MV X will have feature version Y and some will not (since it wasn't created when MV X was first released)

Sorry I forgot to change the name, I will do that today 👍

Copy link
Contributor

@dajac dajac May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like the mapping to remain consistent for MV. If we do the current mapping, some versions/images with MV X will have feature version Y and some will not (since it wasn't created when MV X was first released)

Isn't it something that will happen anyway? For instance, I will add group.version=1 soon and it will require an old MV. Likely the oldest one supported by kraft.

I am not too opinionated on this one though. If you believe that this is the right approach. I trust you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, there are a lot of components that are all getting mixed around here.

  1. All features require IBP-3.3.0-IV0. This is because this is the minimum MV to bootstrap and write a feature record :) This is encoded in Features#validateVersion and doesn't need to be specified for all features since it will be required by all.
  2. Some features may require a specific other feature version in order to be set. None of the features we proposed (transaction/group coordinator) need this but it was requested in the KIP, so I have implemented a framework to do so.
  3. We need to set a reasonable default feature for when folks bootstrap using only metadata. There are a few options here. One is to set all features to 0. We ended up deciding to take the latest features as default. There is a small wrinkle here in that for a given MV, we may introduce a feature version after the code that releases the MV. If folks are running off trunk, they will have different features for the same MV if we choose the current MV, and that's why I suggest the next one.

This bootstrap method is only used for 3. 1 and 2 are covered by the validateVersion method and by enumerating non 3.3 dependencies when defining the featureVersion.

core/src/main/scala/kafka/tools/StorageTool.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/BrokerFeatures.scala Outdated Show resolved Hide resolved
// As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case
// an intended feature is not being set.
if (finalized == 0)
log.debug("Broker {} registered with feature {} that is either unknown or version 0 on the controller",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can validate the feature vs. Features.PRODUCTION_FEATURES, if it's not present there, we can log a warning.

core/src/main/scala/kafka/server/BrokerFeatures.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/tools/StorageTool.scala Outdated Show resolved Hide resolved
* The minimum metadata version which sets this feature version as default. When bootstrapping using only
* a metadata version, a reasonable default for all other features is chosen based on this value.
* This should be defined as the next metadata version to be released when the feature version becomes production ready.
* (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know the release version when we create the feature.

My understanding is that the new version of the feature will start as an unstable one. When we promote it to production ready, we can attach it to the correct MV (the latest one available in the release, I suppose).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the PR. Just a few minor comments.

}
)))
}))
FeatureVersion.PRODUCTION_FEATURES.forEach { feature =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KIP says to generalize unstableMetadataVersionsEnabled to unstableFeatureVersionsEnabled. Do we plan to add it in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be in a followup. I'm working on it in the background. :)

core/src/main/scala/kafka/tools/StorageTool.scala Outdated Show resolved Hide resolved
allFeatures: List[Features],
usesVersionDefault: Boolean): Unit = {
// If we are using --version-default, the default is based on the metadata version.
val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why we need metadataVersionForDefault. If --release_version is not specified, metadataVersion defaults to latest production version. Do we get the same result by passing in that metadataVersion to feature.defaultValue()?

Copy link
Contributor Author

@jolshan jolshan May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not. This was changed yesterday and the code was as you say in the comment.

Do we get the same result by passing in that metadataVersion to feature.defaultValue()

If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday.

I originally changed it in the case where we want to piggyback on the next MV and we may mark the feature as production ready but not the MV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to change it back, I will also update the comments in the metadataVersionMapping method as it will not be correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sorry to be a little all over the place here. I think we should have semantics where on each new feature version released to production, we have a MV released to production.

In this case, whether we specify latest production (as it did before) or use an empty optional to specify the latest production feature SHOULD be equivalent. The only case it is not is if someone improperly doesn't set the metadataMapping correctly. There isn't a great way to enforce that (I can do so via a test), but I guess the question is whether we prefer the empty approach that ensures the latest features are provided OR if we prefer the latest production metadata approach that is simpler but may risk not picking up features if folks implement the method incorrectly/don't update the MV.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be confusing if a user specifies the latest MV version and the result would be different from when nothing is specified (implied assumption is that nothing is shortcut for latest MV known to the tool). It would also be confusing if by default (nothing is specified) we don't have all features set to the latest versions known to the tool. We can provide guidance to new features developers in the comments (and the test feature example) and add a unit test that enforces the equivalence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes.

I thought that's the model being implemented since each FeatureVersion needs to define bootstrapMetadataVersion()?

Another thing is that if we follow this model, it would be inconvenient for each feature to maintain its own latestProductionVersion and make sure that it's consistent with the LATEST_PRODUCTION in MV. Perhaps it's simpler to just maintain the latestProductionVersion in MV? Then through bootstrapMetadataVersion(), we will know whether a feature level is production ready or not.

Copy link
Contributor Author

@jolshan jolshan May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- so if I understand correctly, the request is to remove latest production per feature and to simply mark as production ready if the MV that corresponds to it is production ready?

The only case where this is tricky is when we use the --feature flag and we need to find the latest production version. To do that, we will need to get the latest production Metadata and map that to a feature. It's doable though, so I can proceed with that.

As for

If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes.

Originally this did not need to be a production ready MV even if the feature is production, but I think we are now flipping this around and saying the feature is production ready iff the MV is production ready.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally this did not need to be a production ready MV even if the feature is production, but I think we are now flipping this around and saying the feature is production ready iff the MV is production ready.

If there is a 1-to-1 mapping from feature to MV, reasoning about production readiness in one place seems simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I will push the code soon, but tried to write a clear comment about this.

allFeatures: List[Features],
usesVersionDefault: Boolean): Unit = {
// If we are using --version-default, the default is based on the metadata version.
val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be confusing if a user specifies the latest MV version and the result would be different from when nothing is specified (implied assumption is that nothing is shortcut for latest MV known to the tool). It would also be confusing if by default (nothing is specified) we don't have all features set to the latest versions known to the tool. We can provide guidance to new features developers in the comments (and the test feature example) and add a unit test that enforces the equivalence.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. A couple of more comments.

allFeatures: List[Features],
usesVersionDefault: Boolean): Unit = {
// If we are using --version-default, the default is based on the metadata version.
val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes.

I thought that's the model being implemented since each FeatureVersion needs to define bootstrapMetadataVersion()?

Another thing is that if we follow this model, it would be inconvenient for each feature to maintain its own latestProductionVersion and make sure that it's consistent with the LATEST_PRODUCTION in MV. Perhaps it's simpler to just maintain the latestProductionVersion in MV? Then through bootstrapMetadataVersion(), we will know whether a feature level is production ready or not.

metadataVersion,
featureNamesAndLevelsMap,
Features.PRODUCTION_FEATURES.asScala.toList,
Option(namespace.getString("release_version")).isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be !isEmpty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably add a test for this 🤦‍♀️

Copy link
Contributor

@artemlivshits artemlivshits left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jolshan! Can we also add the unstable config as a follow-up? It would be great to also have it in 3.8. I let @junrao do a final pass on it.

@jolshan
Copy link
Contributor Author

jolshan commented May 28, 2024

Yes @dajac! The code for unstable versions is mostly ready but I need to refactor based on some changes here.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. A few more comments.

metadataVersion,
featureNamesAndLevelsMap,
Features.PRODUCTION_FEATURES.asScala.toList,
!Option(namespace.getString("release_version")).isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why we need to pass in usesVersionDefault? Earlier in getMetadataVersion, we already resolve metadataVersion to LATEST_PRODUCTION if it's not explicitly specified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use feature flags to specify MV, we should not default based on the MV but instead use latest default for non-specified features.

The difference is specifying MV using --version-default vs feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. If --feature is used, but doesn't include MV, we use MetadataVersion.LATEST_PRODUCTION. However, if neither --feature nor --release-version is specified, we use ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG first and use MetadataVersion.LATEST_PRODUCTION as the fallback. Should we keep them consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't particularly split hairs here since I think both are reasonable. However, I can make it consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I thought about this more. I don't think this is correct.
If we don't specify --release-version we will use latest production. Where do you see we use the replication configs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to make it clearer, but I think the original code is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. Got it now. We pass in INTER_BROKER_PROTOCOL_VERSION_CONFIG as the default when calling getMetadataVersion. But that config shouldn't impact the MV used for selecting other features.

          val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap,
            Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))

Copy link
Contributor Author

@jolshan jolshan May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use the passed in metadata version for defaults if --release-version is specified. If release version is specified, we don't use the replication configs.

Copy link
Contributor Author

@jolshan jolshan May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val defaultValue = defaultVersionString match {
      case Some(versionString) => MetadataVersion.fromVersionString(versionString)
      case None => MetadataVersion.LATEST_PRODUCTION
    }

    val releaseVersionTag = Option(namespace.getString("release_version"))
    val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)

    (releaseVersionTag, featureTag) match {
      case (Some(_), Some(_)) => // We should throw an error before we hit this case, but include for completeness
        throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.")
      case (Some(version), None) =>
        MetadataVersion.fromVersionString(version)
      case (None, Some(level)) =>
        MetadataVersion.fromFeatureLevel(level)
      case (None, None) =>
        defaultValue
    }

* version should be made production ready as well.
*
* @param metadataVersion the metadata version we want to use to set the default.
* @return the default version level for the feature and potential metadata version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not returning potential metadata version, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was meaning given the input of MV. I can clarify this since it is confusing.

for (Features feature : Features.PRODUCTION_FEATURES) {
features.put(feature.featureName(), VersionRange.of(
0,
feature.latestProduction()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this take enableUnstable into consideration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the followup. :) It changes a ton of files so I would prefer to do it separately.

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala Outdated Show resolved Hide resolved
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala Outdated Show resolved Hide resolved
FEATURES = Arrays.copyOf(enumValues, enumValues.length);

PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
feature.name != TEST_VERSION.featureName()).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small bug here. We should use equals, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. A few more followup comments.

metadataVersion,
featureNamesAndLevelsMap,
Features.PRODUCTION_FEATURES.asScala.toList,
!Option(namespace.getString("release_version")).isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. If --feature is used, but doesn't include MV, we use MetadataVersion.LATEST_PRODUCTION. However, if neither --feature nor --release-version is specified, we use ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG first and use MetadataVersion.LATEST_PRODUCTION as the fallback. Should we keep them consistent?

metadataVersion: MetadataVersion,
specifiedFeatures: Map[String, java.lang.Short],
allFeatures: List[Features],
usesVersionDefault: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usesVersionDefault => releaseVersionSpecified ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to change this because of the other comment you mentioned anyway 👍

// Ensure the minimum bootstrap metadata version is included if no metadata version dependency.
Map<String, Short> deps = new HashMap<>();
deps.putAll(featureImpl.dependencies());
if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we require each feature to include a dependency on MV? Otherwise, we need to add this logic in all places where Features.validateVersion is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Features should not require a dependency on MV. I don't think we need to add this logic to all places validation is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to the test so it has reasonable features passed in. But when this is called, we are passing in ALL features including metadata version.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. LGTM assuming the tests pass.

@jolshan
Copy link
Contributor Author

jolshan commented May 29, 2024

Test failures are unrelated. Merging 🎉

@jolshan jolshan merged commit 5e3df22 into apache:trunk May 29, 2024
1 check failed
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Jun 1, 2024
…` flag and handling to StorageTool (apache#15685)

As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
wernerdv pushed a commit to wernerdv/kafka that referenced this pull request Jun 3, 2024
…` flag and handling to StorageTool (apache#15685)

As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants