Skip to content

MINOR: Rewrite FetchFromFollowerIntegrationTest from Scala to Java and move to server module#22252

Open
cychiu8 wants to merge 11 commits into
apache:trunkfrom
cychiu8:move-FetchRequestTest-FetchFromFollowerIntegrationTest-to-server-module
Open

MINOR: Rewrite FetchFromFollowerIntegrationTest from Scala to Java and move to server module#22252
cychiu8 wants to merge 11 commits into
apache:trunkfrom
cychiu8:move-FetchRequestTest-FetchFromFollowerIntegrationTest-to-server-module

Conversation

@cychiu8
Copy link
Copy Markdown
Contributor

@cychiu8 cychiu8 commented May 11, 2026

Summary
This PR consolidates FetchRequest related tests into the server module by merging FetchFromFollowerIntegrationTest
into FetchRequestTest.

Key changes include:

  • Moving FetchRequestTest from core to the server module.
  • Rewriting FetchFromFollowerIntegrationTest in Java and subsequently merging its integration tests into
    FetchRequestTest.
  • Adding necessary utility methods to server/src/test/java/org/apache/kafka/server/TestUtils.java to support the
    integration tests.

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) clients labels May 11, 2026
Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also delete FetchFromFollowerIntegrationTest.scala?

@cychiu8
Copy link
Copy Markdown
Contributor Author

cychiu8 commented May 11, 2026

Can we also delete FetchFromFollowerIntegrationTest.scala?

@m1a2st Thanks for reviewing. You're right! We should delete the scala one.

@github-actions github-actions Bot removed the triage PRs from the community label May 12, 2026
* @param timeoutMs The duration in ms to wait for the leader
* @return The leader broker id
*/
public static int waitUntilLeaderIsKnown(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is used solely by FetchRequestTest, would you mind moving it into the test class instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. waitUntilLeaderIsKnown has been moved into FetchRequestTest as a protected static method.


@ClusterTest
@Timeout(60)
public void testFetchFromFollowerWithRoll(ClusterInstance cluster) throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test both consumers, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct! testFetchFromFollowerWithRoll now creates and uses both followerConsumer and leaderConsumer.

}

@ClusterTest
@Timeout(60)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is redundant annotation. It is already included by ClusterTest

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. All @timeout annotations have been removed.


@ClusterTest
@Timeout(15)
public void testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(ClusterInstance cluster) throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Map<String, Uuid> topicIds;

@ClusterTest
@Timeout(15)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


private void verifyRackAwareAssignments(
ExecutorService executor,
List<?> consumers,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use Object or byte[] instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The parameter is now List<Consumer<byte[], byte[]>>.

topics.add(topic);
topics.addAll(List.of(additionalTopics));

for (Object c : consumers) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumers.forEach(Consumer::close);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for (Object c : consumers) {
recordFutures.add(executor.submit(() -> {
try {
@SuppressWarnings("unchecked")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SuppressWarnings("unchecked") smells fishy to me ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Resolved as a consequence of switching from List<?> to List<Consumer<byte[], byte[]>>. No cast or suppression is needed anymore.

Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, left some comments

@Timeout(15)
public void testFollowerCompleteDelayedFetchesOnReplication(ClusterInstance cluster) throws Exception {
this.cluster = cluster;
try (Admin admin = cluster.admin()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this Admin client created but never used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake. The unused try (Admin admin = ...) block has been removed.

verifyRackAwareAssignments(executor, consumers, producer, partitionList, topicWithAllPartitionsOnAllRacks, partitionList, topicWithSingleRackPartitions);

} finally {
executor.shutdown();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be executor.shutdownNow().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you for reminding me the difference between executor.shutdown() and executor.shutdownNow()!

public static final int FOLLOWER_BROKER_ID = 1;

private ClusterInstance cluster;
Map<String, Uuid> topicIds;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field can be a local variable since it’s only used in one method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

List<Integer> partitionList,
String topic,
List<Integer> expectedPartitionOrder,
String... additionalTopics) throws Exception {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use List<String> instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.toList();

List<Consumer<byte[], byte[]>> consumers = consumerConfigs.stream()
.map(config -> cluster.<byte[], byte[]>consumer(config))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        List<Consumer<byte[], byte[]>> consumers = consumerConfigs.stream()
                .map(cluster::<byte[], byte[]>consumer)
                .toList();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the review! updated.

private ClusterInstance cluster;

@ClusterTest
public void testFollowerCompleteDelayedFetchesOnReplication(ClusterInstance cluster) throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should test both consumers, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Thank you for reminding me.


Supplier<Optional<Integer>> newLeaderExists = () -> {
if (expectedLeaderOpt.isPresent()) {
LOG.debug("Checking leader that has changed to {}", expectedLeaderOpt.get());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should keep these debug messages in this migration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove them because the waitForCondition timeout message already identifies the partition and timeoutMs.

Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, few more comments

});

String expectedTopic = version >= 13 ? null : tp.topic();
topicIdPartitionMap = fetchRequest.fetchData(Collections.emptyMap());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
topicIdPartitionMap = fetchRequest.fetchData(Collections.emptyMap());
topicIdPartitionMap = fetchRequest.fetchData(Map.of());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review and catch the old usage part. Updated.

Optional<Integer> currentLeaderEpoch = Optional.of(121);
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch);
FetchRequest fetchRequest = createFetchRequestByVersion(version, tp, partitionData);
Map<Uuid, String> topicNames = Collections.singletonMap(topicId, tp.topic());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<Uuid, String> topicNames = Collections.singletonMap(topicId, tp.topic());
Map<Uuid, String> topicNames = Map.of(topicId, tp.topic());

} else {
return FetchRequest.Builder
.forReplica(version, 0, 1, 1, 1, partitionDataMap)
.removed(Collections.singletonList(tp))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.removed(Collections.singletonList(tp))
.removed(List.of(tp))

if (version >= 13) {
return FetchRequest.Builder
.forReplica(version, 0, 1, 1, 1, partitionDataMap)
.replaced(Collections.singletonList(tp))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.replaced(Collections.singletonList(tp))
.replaced(List.of(tp))


private FetchRequest createFetchRequestByVersion(short version, TopicIdPartition tp,
FetchRequest.PartitionData partitionData) {
Map<TopicPartition, FetchRequest.PartitionData> partitionDataMap = Collections.singletonMap(tp.topicPartition(), partitionData);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<TopicPartition, FetchRequest.PartitionData> partitionDataMap = Collections.singletonMap(tp.topicPartition(), partitionData);
Map<TopicPartition, FetchRequest.PartitionData> partitionDataMap = Map.of(tp.topicPartition(), partitionData);

boolean fetchRequestUsesTopicIds = version >= 13;

FetchRequest fetchRequest = FetchRequest.parse(FetchRequest.Builder
.forReplica(version, 0, 1, 1, 1, Collections.emptyMap())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.forReplica(version, 0, 1, 1, 1, Collections.emptyMap())
.forReplica(version, 0, 1, 1, 1, Map.of())

Uuid topicId0 = Uuid.randomUuid();
Uuid topicId1 = Uuid.randomUuid();
// Only include topic IDs for the first topic partition.
Map<Uuid, String> topicNames = Collections.singletonMap(topicId0, topicPartition0.topic());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<Uuid, String> topicNames = Collections.singletonMap(topicId0, topicPartition0.topic());
Map<Uuid, String> topicNames = Map.of(topicId0, topicPartition0.topic());

Comment on lines +507 to +508
.removed(Collections.emptyList())
.replaced(Collections.emptyList())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.removed(Collections.emptyList())
.replaced(Collections.emptyList())
.removed(List.of())
.replaced(List.of())

Uuid topicId1 = Uuid.randomUuid();

// Only include topic IDs for the first topic partition.
Map<Uuid, String> topicNames = Collections.singletonMap(topicId0, topicPartition0.topic());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<Uuid, String> topicNames = Collections.singletonMap(topicId0, topicPartition0.topic());
Map<Uuid, String> topicNames = Map.of(topicId0, topicPartition0.topic());

Comment on lines +458 to +460
Map<TopicPartition, FetchRequest.PartitionData> partitionData = Collections.singletonMap(tp.topicPartition(),
new FetchRequest.PartitionData(topicId, 0, 0, 0, Optional.empty()));
List<TopicIdPartition> toReplace = Collections.singletonList(tp);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

cychiu8 added 2 commits May 18, 2026 22:23
…n createPartitionMap - replaced the manual connect + serialize + send + receive block with IntegrationTestUtils.connectAndReceive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients core Kafka Broker tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants