Skip to content

Commit

Permalink
[#3618] Update to Quarkus 3.8
Browse files Browse the repository at this point in the history
Updated to Quarkus 3.8.4, adapted unit tests to reflect changes in
behavior of Kafka and k8s client libraries.
  • Loading branch information
sophokles73 committed May 19, 2024
1 parent 320fd30 commit 9a754e5
Show file tree
Hide file tree
Showing 9 changed files with 1,250 additions and 561 deletions.
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<native.builder-image.name>mandrel</native.builder-image.name>
<postgresql-image.name>docker.io/library/postgres:14-alpine</postgresql-image.name>
<qpid-jms.version>1.10.0</qpid-jms.version>
<quarkus.platform.version>3.2.12.Final</quarkus.platform.version>
<quarkus.platform.version>3.8.4</quarkus.platform.version>
<slf4j.version>2.0.6</slf4j.version>
<spring-security-crypto.version>6.1.4</spring-security-crypto.version>
<truth.version>1.1.3</truth.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -318,7 +318,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
receivedRecordsCheckpoint.flag();
return promise.future();
};
final Map<String, String> consumerConfig = consumerConfigProperties.getConsumerConfig("test");
final var consumerConfig = consumerConfigProperties.getConsumerConfig("test");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "300000"); // periodic commit shall not play a role here
consumerConfig.put(AsyncHandlingAutoCommitKafkaConsumer.CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS, "0");
Expand Down Expand Up @@ -347,6 +347,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
ctx.failNow(receivedRecordsCtx.causeOfFailure());
return;
}
LOG.debug("records received");

// records received, complete the handling of some of them
recordsHandlingPromiseMap.get(0L).complete();
Expand All @@ -356,39 +357,48 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
recordsHandlingPromiseMap.get(4L).complete();

// define VertxTestContexts for 3 checks (3x rebalance/commit)
final AtomicInteger checkIndex = new AtomicInteger(0);
final List<VertxTestContext> commitCheckContexts = IntStream.range(0, 3)
.mapToObj(i -> new VertxTestContext()).collect(Collectors.toList());
final List<Checkpoint> commitCheckpoints = commitCheckContexts.stream()
.map(c -> c.checkpoint(1)).collect(Collectors.toList());
final var checkIndex = new AtomicInteger(0);
final var commitCheckContexts = IntStream.range(0, 3)
.mapToObj(i -> new VertxTestContext()).toList();
final var commitCheckpoints = commitCheckContexts.stream()
.map(c -> c.laxCheckpoint(1)).toList();
final InterruptableSupplier<Boolean> waitForCurrentCommitCheckResult = () -> {
final var checkContext = commitCheckContexts.get(checkIndex.get());
assertWithMessage("partition assigned in 5s for checking of commits")
.that(commitCheckContexts.get(checkIndex.get()).awaitCompletion(5, TimeUnit.SECONDS))
.that(checkContext.awaitCompletion(5, TimeUnit.SECONDS))
.isTrue();
if (commitCheckContexts.get(checkIndex.get()).failed()) {
ctx.failNow(commitCheckContexts.get(checkIndex.get()).causeOfFailure());
if (checkContext.failed()) {
ctx.failNow(checkContext.causeOfFailure());
return false;
}
return true;
};

consumer.setOnPartitionsAssignedHandler(partitions -> {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
LOG.debug("onPartitionsAssignedHandler invoked [check index: {}, newly assigned partitions: {}]",
checkIndex.get(), partitions.stream().map(t -> t.toString()).collect(Collectors.joining(", ")));
final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
LOG.debug("committed partition [name: {}, offset: {}, expected offset: {}]",
TOPIC_PARTITION, offsetAndMetadata.offset(), latestFullyHandledOffset.get() + 1L);
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
// assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
});
commitCheckpoints.get(checkIndex.get()).flag();
if (offsetAndMetadata.offset() == latestFullyHandledOffset.get() + 1L) {
commitCheckpoints.get(checkIndex.get()).flag();
}
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
LOG.debug("force rebalance 1");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
}
checkIndex.incrementAndGet();

// now another rebalance (ie. commit trigger) - no change in offsets
LOG.debug("force rebalance 2");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
Expand All @@ -401,6 +411,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
// offset 4 already complete
latestFullyHandledOffset.set(4);
// again rebalance/commit
LOG.debug("force rebalance 3");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (waitForCurrentCommitCheckResult.get()) {
ctx.completeNow();
Expand Down Expand Up @@ -480,15 +491,16 @@ protected void onPartitionsRevokedBlocking(
consumer.setOnPartitionsRevokedHandler(s -> {
ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isTrue());
});
final Checkpoint commitCheckDone = ctx.checkpoint(1);
final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committed.get(TOPIC_PARTITION);
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(numTestRecords);
});
commitCheckDone.flag();
if (offsetAndMetadata.offset() == numTestRecords) {
commitCheckDone.flag();
}
});
// trigger a rebalance where the currently assigned partition is revoked
// (and then assigned again - otherwise its offset wouldn't be returned by mockConsumer.committed())
Expand Down Expand Up @@ -662,10 +674,12 @@ public void testConsumerCommitsInitialOffset(final VertxTestContext ctx) throws
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));

final VertxTestContext consumerStartedCtx = new VertxTestContext();
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.checkpoint(2);
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.laxCheckpoint(2);
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.setOnRebalanceDoneHandler(s -> consumerStartedCheckpoint.flag());
consumer.setOnRebalanceDoneHandler(s -> {
consumerStartedCheckpoint.flag();
});
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
vertx.getOrCreateContext().runOnContext(v -> {
consumer.start()
Expand Down Expand Up @@ -802,7 +816,7 @@ public void testScenarioWithPartitionRevokedWhileHandlingIncomplete(final VertxT
recordsHandlingPromiseMap.get(1L).complete();
recordsHandlingPromiseMap.get(2L).complete();

final Checkpoint commitCheckDone = ctx.checkpoint(1);
final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
LOG.info("rebalancing ...");
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
Expand Down
1 change: 1 addition & 0 deletions clients/kafka-common/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
</root>

<logger name="org.apache.kafka" level="ERROR"/>
<logger name="org.eclipse.hono" level="INFO"/>

</configuration>

0 comments on commit 9a754e5

Please sign in to comment.