Skip to content

Commit

Permalink
KAFKA-14909: check zkMigrationReady tag before migration (#13631)
Browse files Browse the repository at this point in the history
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
  • Loading branch information
showuon committed Apr 28, 2023
1 parent c708f7b commit d796480
Show file tree
Hide file tree
Showing 20 changed files with 310 additions and 106 deletions.
3 changes: 3 additions & 0 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -153,6 +153,9 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
Expand Down
Expand Up @@ -956,12 +956,13 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(
apiVersionsResponse.data().apiKeys(),
apiVersionsResponse.data().supportedFeatures());
apiVersionsResponse.data().supportedFeatures(),
apiVersionsResponse.data().zkMigrationReady());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
}

/**
Expand Down
Expand Up @@ -48,6 +48,8 @@ public class NodeApiVersions {

private final Map<String, SupportedVersionRange> supportedFeatures;

private final boolean zkMigrationEnabled;

/**
* Create a NodeApiVersions object with the current ApiVersions.
*
Expand Down Expand Up @@ -76,7 +78,7 @@ public static NodeApiVersions create(Collection<ApiVersion> overrides) {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return new NodeApiVersions(apiVersions, Collections.emptyList());
return new NodeApiVersions(apiVersions, Collections.emptyList(), false);
}


Expand All @@ -95,7 +97,7 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe
.setMaxVersion(maxVersion)));
}

public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures) {
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures, boolean zkMigrationEnabled) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
Expand All @@ -112,6 +114,7 @@ public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<Suppor
new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
}
this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
this.zkMigrationEnabled = zkMigrationEnabled;
}

/**
Expand Down Expand Up @@ -236,4 +239,8 @@ public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
public Map<String, SupportedVersionRange> supportedFeatures() {
return supportedFeatures;
}

public boolean zkMigrationEnabled() {
return zkMigrationEnabled;
}
}
Expand Up @@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

Expand All @@ -131,28 +132,31 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures
Features<SupportedVersionRange> latestSupportedFeatures,
boolean zkMigrationEnabled
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH,
zkMigrationEnabled);
}

public static ApiVersionsResponse createApiVersionsResponse(
Expand All @@ -163,7 +167,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
Expand All @@ -186,7 +191,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
);
}

Expand All @@ -195,7 +201,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
long finalizedFeaturesEpoch,
boolean zkMigrationEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
Expand All @@ -204,7 +211,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
)
);
}
Expand Down Expand Up @@ -294,7 +302,8 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch
final long finalizedFeaturesEpoch,
final boolean zkMigrationEnabled
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
Expand All @@ -303,6 +312,7 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);

return data;
}
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class NodeApiVersionsTest {

@Test
public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testVersionsToString() {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() {

@Test
public void testUsableVersionCalculationNoKnownVersions() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
Expand All @@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
Expand All @@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);

for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
Expand Down
Expand Up @@ -634,7 +634,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
Collections.singletonMap("test_feature_1", (short) 2),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
defaultFeatureMetadata().finalizedFeaturesEpoch().get(),
false
);
}
return new ApiVersionsResponse(
Expand Down
Expand Up @@ -121,7 +121,8 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
Expand All @@ -141,7 +142,8 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault
10L,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
Expand Down Expand Up @@ -169,7 +171,8 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
Expand All @@ -188,7 +191,8 @@ public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);

// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
Expand Down
Expand Up @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady)
}

/**
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Expand Up @@ -49,7 +49,8 @@ object ApiVersionManager {
forwardingManager,
supportedFeatures,
metadataCache,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)
}
}
Expand All @@ -58,25 +59,28 @@ class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
) extends ApiVersionManager {

def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}

private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)

override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
}
}

Expand All @@ -85,7 +89,8 @@ class DefaultApiVersionManager(
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean = false
) extends ApiVersionManager {

val enabledApis = ApiKeys.apisForListener(listenerType).asScala
Expand All @@ -103,7 +108,8 @@ class DefaultApiVersionManager(
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Expand Up @@ -175,7 +175,8 @@ class ControllerServer(

val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)

tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
Expand Down Expand Up @@ -270,7 +271,8 @@ class ControllerServer(
"zk migration",
fatal = false,
() => {}
)
),
quorumFeatures
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Expand Up @@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val metaProperties = MetaProperties(
Expand Down
Expand Up @@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
Expand Down
Expand Up @@ -154,7 +154,7 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Expand Up @@ -184,7 +184,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)

new KafkaApis(
requestChannel = requestChannel,
Expand Down

0 comments on commit d796480

Please sign in to comment.