Skip to content

Commit

Permalink
MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE…
Browse files Browse the repository at this point in the history
…xtensions (apache#15917)

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>
  • Loading branch information
chia7712 authored and TaiJuWu committed Jun 8, 2024
1 parent ea2eac5 commit 57a16ab
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 302 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
<allow pkg="kafka.utils" />
<allow pkg="kafka.test" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
Expand Down
77 changes: 57 additions & 20 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package kafka.test;

import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.test.TestUtils;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -32,6 +35,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
Expand All @@ -46,6 +50,10 @@ default boolean isKRaftTest() {
return type() == Type.KRAFT || type() == Type.CO_KRAFT;
}

Map<Integer, KafkaBroker> brokers();

Map<Integer, ControllerServer> controllers();

/**
* The immutable cluster configuration used to create this cluster.
*/
Expand All @@ -61,7 +69,9 @@ default boolean isKRaftTest() {
/**
* Return the set of all broker IDs configured for this test.
*/
Set<Integer> brokerIds();
default Set<Integer> brokerIds() {
return brokers().keySet();
}

/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
Expand Down Expand Up @@ -97,7 +107,11 @@ default Optional<ListenerName> controlPlaneListenerName() {
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
*/
Collection<SocketServer> brokerSocketServers();
default Collection<SocketServer> brokerSocketServers() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}

/**
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
Expand All @@ -108,17 +122,20 @@ default Optional<ListenerName> controlPlaneListenerName() {
/**
* Return any one of the broker servers. Throw an error if none are found
*/
SocketServer anyBrokerSocketServer();
default SocketServer anyBrokerSocketServer() {
return brokerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

/**
* Return any one of the controller servers. Throw an error if none are found
*/
SocketServer anyControllerSocketServer();

/**
* Return a mapping of the underlying broker IDs to their supported features
*/
Map<Integer, BrokerFeatures> brokerFeatures();
default SocketServer anyControllerSocketServer() {
return controllerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

String clusterId();

Expand All @@ -137,16 +154,6 @@ default Admin createAdminClient() {
return createAdminClient(new Properties());
}

void start();

void stop();

void shutdownBroker(int brokerId);

void startBroker(int brokerId);

void waitForReadyBrokers() throws InterruptedException;

default Set<GroupProtocol> supportedGroupProtocols() {
Map<String, String> serverProperties = config().serverProperties();
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
Expand All @@ -160,4 +167,34 @@ default Set<GroupProtocol> supportedGroupProtocols() {

return Collections.unmodifiableSet(supportedGroupProtocols);
}

//---------------------------[modify]---------------------------//

void start();

void stop();

void shutdownBroker(int brokerId);

void startBroker(int brokerId);

//---------------------------[wait]---------------------------//

void waitForReadyBrokers() throws InterruptedException;

default void waitForTopic(String topic, int partitions) throws InterruptedException {
// wait for metadata
TestUtils.waitForCondition(
() -> brokers().values().stream().allMatch(broker -> partitions == 0 ?
broker.metadataCache().numPartitions(topic).isEmpty() :
broker.metadataCache().numPartitions(topic).contains(partitions)
), 60000L, topic + " metadata not propagated after 60000 ms");

for (ControllerServer controller : controllers().values()) {
long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset - 1;
TestUtils.waitForCondition(
() -> brokers().values().stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
60000L, "Timeout waiting for controller metadata propagating to brokers");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.test.junit;

import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
Expand All @@ -39,6 +39,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -148,64 +149,31 @@ public String bootstrapControllers() {
return clusterReference.get().bootstrapControllers();
}

@Override
public Collection<SocketServer> brokerSocketServers() {
return brokers()
.map(BrokerServer::socketServer)
.collect(Collectors.toList());
}

@Override
public ListenerName clientListener() {
return ListenerName.normalised("EXTERNAL");
}

@Override
public Optional<ListenerName> controllerListenerName() {
return OptionConverters.toJava(controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new));
return controllers().values().stream()
.findAny()
.flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption()))
.map(ListenerName::new);
}

@Override
public Collection<SocketServer> controllerSocketServers() {
return controllers()
return controllers().values().stream()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}

@Override
public SocketServer anyBrokerSocketServer() {
return brokers()
.map(BrokerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

@Override
public SocketServer anyControllerSocketServer() {
return controllers()
.map(ControllerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return brokers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
BrokerServer::brokerFeatures
));
}

@Override
public String clusterId() {
return controllers().findFirst().map(ControllerServer::clusterId).orElse(
brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(
() -> new RuntimeException("No controllers or brokers!"))
);
}

public Collection<ControllerServer> controllerServers() {
return controllers().collect(Collectors.toList());
return Stream.concat(controllers().values().stream().map(ControllerServer::clusterId),
brokers().values().stream().map(KafkaBroker::clusterId)).findFirst()
.orElseThrow(() -> new RuntimeException("No controllers or brokers!"));
}

@Override
Expand All @@ -220,16 +188,7 @@ public ClusterConfig config() {

@Override
public Set<Integer> controllerIds() {
return controllers()
.map(controllerServer -> controllerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public Set<Integer> brokerIds() {
return brokers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
return controllers().keySet();
}

@Override
Expand Down Expand Up @@ -292,12 +251,16 @@ private BrokerServer findBrokerOrThrow(int brokerId) {
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}

public Stream<BrokerServer> brokers() {
return clusterReference.get().brokers().values().stream();
@Override
public Map<Integer, KafkaBroker> brokers() {
return clusterReference.get().brokers().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Stream<ControllerServer> controllers() {
return clusterReference.get().controllers().values().stream();
@Override
public Map<Integer, ControllerServer> controllers() {
return Collections.unmodifiableMap(clusterReference.get().controllers());
}

}
Expand Down
65 changes: 17 additions & 48 deletions core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
Expand Down Expand Up @@ -52,7 +53,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
Expand Down Expand Up @@ -129,13 +129,6 @@ public String bootstrapControllers() {
throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
}

@Override
public Collection<SocketServer> brokerSocketServers() {
return servers()
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}

@Override
public ListenerName clientListener() {
return clusterReference.get().listenerName();
Expand All @@ -149,40 +142,15 @@ public Optional<ListenerName> controlPlaneListenerName() {

@Override
public Collection<SocketServer> controllerSocketServers() {
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
return brokers().values().stream()
.filter(s -> ((KafkaServer) s).kafkaController().isActive())
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}

@Override
public SocketServer anyBrokerSocketServer() {
return servers()
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}

@Override
public SocketServer anyControllerSocketServer() {
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
}

@Override
public Map<Integer, BrokerFeatures> brokerFeatures() {
return servers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
KafkaServer::brokerFeatures
));
}

@Override
public String clusterId() {
return servers().findFirst().map(KafkaServer::clusterId).orElseThrow(
return brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
() -> new RuntimeException("No broker instances found"));
}

Expand All @@ -201,13 +169,6 @@ public Set<Integer> controllerIds() {
return brokerIds();
}

@Override
public Set<Integer> brokerIds() {
return servers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
Expand Down Expand Up @@ -271,14 +232,22 @@ public void waitForReadyBrokers() throws InterruptedException {
}

private KafkaServer findBrokerOrThrow(int brokerId) {
return servers()
return brokers().values().stream()
.filter(server -> server.config().brokerId() == brokerId)
.map(s -> (KafkaServer) s)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}

public Stream<KafkaServer> servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
@Override
public Map<Integer, ControllerServer> controllers() {
return Collections.emptyMap();
}

@Override
public Map<Integer, KafkaBroker> brokers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers())
.stream().collect(Collectors.toMap(s -> s.config().brokerId(), s -> s));
}
}

Expand Down
Loading

0 comments on commit 57a16ab

Please sign in to comment.