Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14909: check zkMigrationReady tag before migration #13631

Merged
merged 10 commits into from Apr 28, 2023
3 changes: 3 additions & 0 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -153,6 +153,9 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
Expand Down
Expand Up @@ -956,12 +956,13 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(
apiVersionsResponse.data().apiKeys(),
apiVersionsResponse.data().supportedFeatures());
apiVersionsResponse.data().supportedFeatures(),
apiVersionsResponse.data().zkMigrationReady());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
}

/**
Expand Down
Expand Up @@ -48,6 +48,8 @@ public class NodeApiVersions {

private final Map<String, SupportedVersionRange> supportedFeatures;

private final boolean zkMigrationEnabled;

/**
* Create a NodeApiVersions object with the current ApiVersions.
*
Expand Down Expand Up @@ -76,7 +78,7 @@ public static NodeApiVersions create(Collection<ApiVersion> overrides) {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return new NodeApiVersions(apiVersions, Collections.emptyList());
return new NodeApiVersions(apiVersions, Collections.emptyList(), false);
}


Expand All @@ -95,7 +97,7 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe
.setMaxVersion(maxVersion)));
}

public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures) {
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures, boolean zkMigrationEnabled) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
Expand All @@ -112,6 +114,7 @@ public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<Suppor
new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
}
this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
this.zkMigrationEnabled = zkMigrationEnabled;
}

/**
Expand Down Expand Up @@ -236,4 +239,8 @@ public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
public Map<String, SupportedVersionRange> supportedFeatures() {
return supportedFeatures;
}

public boolean zkMigrationEnabled() {
return zkMigrationEnabled;
}
}
Expand Up @@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

Expand All @@ -131,28 +132,31 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures
Features<SupportedVersionRange> latestSupportedFeatures,
boolean zkMigrationEnabled
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH,
zkMigrationEnabled);
}

public static ApiVersionsResponse createApiVersionsResponse(
Expand All @@ -163,7 +167,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
Expand All @@ -186,7 +191,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
);
}

Expand All @@ -195,7 +201,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
long finalizedFeaturesEpoch,
boolean zkMigrationEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
Expand All @@ -204,7 +211,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
)
);
}
Expand Down Expand Up @@ -294,7 +302,8 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch
final long finalizedFeaturesEpoch,
final boolean zkMigrationEnabled
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
Expand All @@ -303,6 +312,7 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);

return data;
}
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class NodeApiVersionsTest {

@Test
public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testVersionsToString() {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() {

@Test
public void testUsableVersionCalculationNoKnownVersions() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
Expand All @@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
Expand All @@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);

for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
Expand Down
Expand Up @@ -634,7 +634,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
Collections.singletonMap("test_feature_1", (short) 2),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
defaultFeatureMetadata().finalizedFeaturesEpoch().get(),
false
);
}
return new ApiVersionsResponse(
Expand Down
Expand Up @@ -121,7 +121,8 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
Expand All @@ -141,7 +142,8 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault
10L,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
Expand Down Expand Up @@ -169,7 +171,8 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
Expand All @@ -188,7 +191,8 @@ public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
Expand Down
Expand Up @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady)
}

/**
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Expand Up @@ -49,7 +49,8 @@ object ApiVersionManager {
forwardingManager,
supportedFeatures,
metadataCache,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)
}
}
Expand All @@ -58,25 +59,28 @@ class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
) extends ApiVersionManager {

def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}

private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)

override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
}
}

Expand All @@ -85,7 +89,8 @@ class DefaultApiVersionManager(
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean = false
) extends ApiVersionManager {

val enabledApis = ApiKeys.apisForListener(listenerType).asScala
Expand All @@ -103,7 +108,8 @@ class DefaultApiVersionManager(
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Expand Up @@ -175,7 +175,8 @@ class ControllerServer(

val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)

tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
Expand Down Expand Up @@ -270,7 +271,8 @@ class ControllerServer(
"zk migration",
fatal = false,
() => {}
)
),
quorumFeatures
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feed quorumFeatures into KRaftMigrationDriver now

)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Expand Up @@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val metaProperties = MetaProperties(
Expand Down
Expand Up @@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
Expand Down
Expand Up @@ -154,7 +154,7 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Expand Up @@ -184,7 +184,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)

new KafkaApis(
requestChannel = requestChannel,
Expand Down