Skip to content

Commit

Permalink
add zkMigrationEnabled
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Apr 24, 2023
1 parent 23c39a1 commit 7f02852
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ private void handleInitiateApiVersionRequests(long now) {
Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
log.info("!!! Initiating API versions fetch from node {}.", node);
// We transition the connection to the CHECKING_API_VERSIONS state only when
// the ApiVersionsRequest is queued up to be sent out. Without this, the client
// could remain in the CHECKING_API_VERSIONS state forever if the channel does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH,
false);
}

public static ApiVersionsResponse createApiVersionsResponse(
Expand All @@ -163,7 +164,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
Expand All @@ -186,7 +188,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
);
}

Expand All @@ -195,7 +198,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
long finalizedFeaturesEpoch,
boolean zkMigrationEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
Expand All @@ -204,7 +208,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
)
);
}
Expand Down Expand Up @@ -294,7 +299,8 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch
final long finalizedFeaturesEpoch,
final boolean zkMigrationEnabled
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
Expand All @@ -303,6 +309,7 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);

return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
Collections.singletonMap("test_feature_1", (short) 2),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
defaultFeatureMetadata().finalizedFeaturesEpoch().get(),
false
);
}
return new ApiVersionsResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
Expand All @@ -141,7 +142,8 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault
10L,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
Expand Down Expand Up @@ -169,7 +171,8 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
Expand All @@ -188,7 +191,8 @@ public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object ApiVersionManager {
forwardingManager,
supportedFeatures,
metadataCache,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)
}
}
Expand All @@ -58,7 +59,8 @@ class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
) extends ApiVersionManager {

def this(
Expand All @@ -69,7 +71,8 @@ class SimpleApiVersionManager(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}

Expand All @@ -85,7 +88,8 @@ class DefaultApiVersionManager(
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
) extends ApiVersionManager {

val enabledApis = ApiKeys.apisForListener(listenerType).asScala
Expand All @@ -103,7 +107,8 @@ class DefaultApiVersionManager(
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}
}

0 comments on commit 7f02852

Please sign in to comment.