Skip to content

Commit

Permalink
buildable
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Apr 24, 2023
1 parent 7f02852 commit 4298fc6
Show file tree
Hide file tree
Showing 17 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public synchronized byte maxUsableProduceMagic() {
return maxUsableProduceMagic;
}

// check all nodes are ZK Migration ready
// check if all nodes are ZK Migration ready
public boolean isAllNodeZkMigrationReady() {
return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,9 +960,9 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
apiVersionsResponse.data().zkMigrationReady());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.info("!!! Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
}

/**
Expand Down Expand Up @@ -1004,7 +1004,7 @@ private void handleInitiateApiVersionRequests(long now) {
Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.info("!!! Initiating API versions fetch from node {}.", node);
log.debug("Initiating API versions fetch from node {}.", node);
// We transition the connection to the CHECKING_API_VERSIONS state only when
// the ApiVersionsRequest is queued up to be sent out. Without this, the client
// could remain in the CHECKING_API_VERSIONS state forever if the channel does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

Expand All @@ -131,29 +132,31 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures
Features<SupportedVersionRange> latestSupportedFeatures,
boolean zkMigrationEnabled
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH,
false);
zkMigrationEnabled);
}

public static ApiVersionsResponse createApiVersionsResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class NodeApiVersionsTest {

@Test
public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testVersionsToString() {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() {

@Test
public void testUsableVersionCalculationNoKnownVersions() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
Expand All @@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
Expand All @@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);

for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
Expand Down
3 changes: 1 addition & 2 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ process.roles=controller
node.id=1

# The connect string for the controller quorum
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@localhost:9093,4@localhost:9094,5@localhost:9095
controller.quorum.voters=1@localhost:9093
############################# Socket Server Settings #############################

# The address the socket server listens on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady)
}

/**
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class SimpleApiVersionManager(

def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean
) = {
this(
listenerType,
Expand All @@ -79,7 +80,7 @@ class SimpleApiVersionManager(
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)

override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
}
}

Expand All @@ -89,7 +90,7 @@ class DefaultApiVersionManager(
features: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
val zkMigrationEnabled: Boolean = false
) extends ApiVersionManager {

val enabledApis = ApiKeys.apisForListener(listenerType).asScala
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ControllerServer(

val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)

tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val metaProperties = MetaProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)

new KafkaApis(
requestChannel = requestChannel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private KafkaApis createKafkaApis() {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)).
build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private KafkaApis createKafkaApis() {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)).
build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_CONTROLLER_QUORUM:
if (isControllerQuorumReadyForMigration()) {
log.debug("Controller Quorum is ready for Zk to KRaft migration");
log.info("Controller Quorum is ready for Zk to KRaft migration");
// Note that leadership would not change here. Hence we do not need to
// `apply` any leadership state change.
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static NodeApiVersions nodeApiVersions(List<Entry<String, VersionRange>>
setMinVersion(entry.getValue().min()).
setMaxVersion(entry.getValue().max()));
});
return new NodeApiVersions(Collections.emptyList(), features);
return new NodeApiVersions(Collections.emptyList(), features, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ public void testOnlySendNeededRPCsToBrokers() throws Exception {
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test")
new MockFaultHandler("test"),
null
);

MetadataImage image = MetadataImage.EMPTY;
Expand Down Expand Up @@ -365,7 +366,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler
faultHandler,
null
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
Expand Down

0 comments on commit 4298fc6

Please sign in to comment.