Skip to content

Commit

Permalink
refctor and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Apr 25, 2023
1 parent 019e276 commit c5f490b
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 88 deletions.
3 changes: 3 additions & 0 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -153,6 +153,9 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
Expand Down
Expand Up @@ -64,8 +64,4 @@ public synchronized byte maxUsableProduceMagic() {
return maxUsableProduceMagic;
}

// check if all nodes are ZK Migration ready
public boolean isAllNodeZkMigrationReady() {
return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled());
}
}
Expand Up @@ -24,8 +24,6 @@
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ApiVersionsTest {

Expand Down Expand Up @@ -57,21 +55,4 @@ public void testMaxUsableProduceMagicWithRaftController() {
.setMaxVersion((short) 2))));
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
}

@Test
public void testZkMigrationReady() {
ApiVersions apiVersions = new ApiVersions();

apiVersions.update("0", NodeApiVersions.create());
assertFalse(apiVersions.isAllNodeZkMigrationReady());

apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertTrue(apiVersions.isAllNodeZkMigrationReady());

apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertTrue(apiVersions.isAllNodeZkMigrationReady());

apiVersions.update("2", NodeApiVersions.create());
assertFalse(apiVersions.isAllNodeZkMigrationReady());
}
}
1 change: 1 addition & 0 deletions config/kraft/controller.properties
Expand Up @@ -28,6 +28,7 @@ node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093

############################# Socket Server Settings #############################

# The address the socket server listens on.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Expand Up @@ -272,7 +272,7 @@ class ControllerServer(
fatal = false,
() => {}
),
sharedServer.raftManager.apiVersions
quorumFeatures
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
Expand Down
Expand Up @@ -128,4 +128,27 @@ VersionRange localSupportedFeature(String featureName) {
boolean isControllerId(int nodeId) {
return quorumNodeIds.contains(nodeId);
}

// check if all controller nodes are ZK Migration ready
public boolean isAllControllersZkMigrationReady() {
List<String> missingApiVers = new ArrayList<>();
List<String> zkMigrationNotReady = new ArrayList<>();
for (int id : quorumNodeIds) {
NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id));
if (nodeVersions == null) {
missingApiVers.add(String.valueOf(id));
} else if (!nodeVersions.zkMigrationEnabled()) {
zkMigrationNotReady.add(String.valueOf(id));
}
}

boolean isReady = missingApiVers.isEmpty() && zkMigrationNotReady.isEmpty();
if (!isReady) {
String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : "Missing apiVersion from nodes: " + missingApiVers;
String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady;
log.debug("Not all controller nodes ZK migration are ready. {}. {}", zkMigrationNotReadyMsg, missingApiVersionMsg);
}

return isReady;
}
}
Expand Up @@ -16,14 +16,13 @@
*/
package org.apache.kafka.metadata.migration;

import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
Expand Down Expand Up @@ -83,7 +82,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
private volatile ApiVersions apiVersions;
private volatile QuorumFeatures quorumFeatures;

public KRaftMigrationDriver(
int nodeId,
Expand All @@ -92,13 +91,14 @@ public KRaftMigrationDriver(
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
ApiVersions apiVersions
QuorumFeatures quorumFeatures,
Time time
) {
this.nodeId = nodeId;
this.zkRecordConsumer = zkRecordConsumer;
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
this.time = Time.SYSTEM;
this.time = time;
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
Expand All @@ -108,9 +108,22 @@ public KRaftMigrationDriver(
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
this.apiVersions = apiVersions;
this.quorumFeatures = quorumFeatures;
}

public KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM);
}


public void start() {
eventQueue.prepend(new PollEvent());
}
Expand Down Expand Up @@ -139,7 +152,11 @@ private void initializeMigrationState() {
}

private boolean isControllerQuorumReadyForMigration() {
return this.apiVersions.isAllNodeZkMigrationReady();
if (!this.quorumFeatures.isAllControllersZkMigrationReady()) {
log.info("Still waiting for all controller nodes ready to begin the migration.");
return false;
}
return true;
}

private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set<Integer> brokerIds) {
Expand Down Expand Up @@ -403,7 +420,7 @@ public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_CONTROLLER_QUORUM:
if (isControllerQuorumReadyForMigration()) {
log.info("Controller Quorum is ready for Zk to KRaft migration");
log.debug("Controller Quorum is ready for Zk to KRaft migration");
// Note that leadership would not change here. Hence we do not need to
// `apply` any leadership state change.
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
Expand Down
Expand Up @@ -100,4 +100,30 @@ public void testIsControllerId() {
assertTrue(quorumFeatures.isControllerId(2));
assertFalse(quorumFeatures.isControllerId(3));
}

@Test
public void testZkMigrationReady() {
ApiVersions apiVersions = new ApiVersions();
QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, LOCAL, Arrays.asList(0, 1, 2));

// create apiVersion with zkMigrationEnabled flag set for node 0, the other 2 nodes have no apiVersions info
apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertFalse(quorumFeatures.isAllControllersZkMigrationReady());

// create apiVersion with zkMigrationEnabled flag set for node 1, the other 1 node have no apiVersions info
apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertFalse(quorumFeatures.isAllControllersZkMigrationReady());

// create apiVersion with zkMigrationEnabled flag disabled for node 2, should still be not ready
apiVersions.update("2", NodeApiVersions.create());
assertFalse(quorumFeatures.isAllControllersZkMigrationReady());

// update zkMigrationEnabled flag to enabled for node 2, should be ready now
apiVersions.update("2", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertTrue(quorumFeatures.isAllControllersZkMigrationReady());

// create apiVersion with zkMigrationEnabled flag disabled for a non-controller, and expect we fill filter it out
apiVersions.update("3", NodeApiVersions.create());
assertTrue(quorumFeatures.isAllControllersZkMigrationReady());
}
}

0 comments on commit c5f490b

Please sign in to comment.