Skip to content

Commit

Permalink
[#3618] Update to Quarkus 3.8
Browse files Browse the repository at this point in the history
Updated to Quarkus 3.8.4
  • Loading branch information
sophokles73 committed May 9, 2024
1 parent f7178e8 commit fb1c797
Show file tree
Hide file tree
Showing 9 changed files with 620 additions and 562 deletions.
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<native.builder-image.name>mandrel</native.builder-image.name>
<postgresql-image.name>docker.io/library/postgres:14-alpine</postgresql-image.name>
<qpid-jms.version>1.10.0</qpid-jms.version>
<quarkus.platform.version>3.2.12.Final</quarkus.platform.version>
<quarkus.platform.version>3.8.4</quarkus.platform.version>
<slf4j.version>2.0.6</slf4j.version>
<spring-security-crypto.version>6.1.4</spring-security-crypto.version>
<truth.version>1.1.3</truth.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
receivedRecordsCheckpoint.flag();
return promise.future();
};
final Map<String, String> consumerConfig = consumerConfigProperties.getConsumerConfig("test");
final var consumerConfig = consumerConfigProperties.getConsumerConfig("test");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "300000"); // periodic commit shall not play a role here
consumerConfig.put(AsyncHandlingAutoCommitKafkaConsumer.CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS, "0");
Expand Down Expand Up @@ -347,6 +347,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
ctx.failNow(receivedRecordsCtx.causeOfFailure());
return;
}
LOG.debug("records received");

// records received, complete the handling of some of them
recordsHandlingPromiseMap.get(0L).complete();
Expand All @@ -356,39 +357,48 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
recordsHandlingPromiseMap.get(4L).complete();

// define VertxTestContexts for 3 checks (3x rebalance/commit)
final AtomicInteger checkIndex = new AtomicInteger(0);
final List<VertxTestContext> commitCheckContexts = IntStream.range(0, 3)
.mapToObj(i -> new VertxTestContext()).collect(Collectors.toList());
final List<Checkpoint> commitCheckpoints = commitCheckContexts.stream()
.map(c -> c.checkpoint(1)).collect(Collectors.toList());
final var checkIndex = new AtomicInteger(0);
final var commitCheckContexts = IntStream.range(0, 3)
.mapToObj(i -> new VertxTestContext()).toList();
final var commitCheckpoints = commitCheckContexts.stream()
.map(c -> c.laxCheckpoint(1)).toList();
final InterruptableSupplier<Boolean> waitForCurrentCommitCheckResult = () -> {
final var checkContext = commitCheckContexts.get(checkIndex.get());
assertWithMessage("partition assigned in 5s for checking of commits")
.that(commitCheckContexts.get(checkIndex.get()).awaitCompletion(5, TimeUnit.SECONDS))
.that(checkContext.awaitCompletion(5, TimeUnit.SECONDS))
.isTrue();
if (commitCheckContexts.get(checkIndex.get()).failed()) {
ctx.failNow(commitCheckContexts.get(checkIndex.get()).causeOfFailure());
if (checkContext.failed()) {
ctx.failNow(checkContext.causeOfFailure());
return false;
}
return true;
};

consumer.setOnPartitionsAssignedHandler(partitions -> {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
LOG.debug("onPartitionsAssignedHandler invoked [check index: {}, newly assigned partitions: {}]",
checkIndex.get(), partitions.stream().map(t -> t.toString()).collect(Collectors.joining(", ")));
final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
LOG.debug("committed partition [name: {}, offset: {}, expected offset: {}]",
TOPIC_PARTITION, offsetAndMetadata.offset(), latestFullyHandledOffset.get() + 1L);
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
// assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
});
commitCheckpoints.get(checkIndex.get()).flag();
if (offsetAndMetadata.offset() == latestFullyHandledOffset.get() + 1L) {
commitCheckpoints.get(checkIndex.get()).flag();
}
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
LOG.debug("force rebalance 1");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
}
checkIndex.incrementAndGet();

// now another rebalance (ie. commit trigger) - no change in offsets
LOG.debug("force rebalance 2");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
Expand All @@ -401,6 +411,7 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
// offset 4 already complete
latestFullyHandledOffset.set(4);
// again rebalance/commit
LOG.debug("force rebalance 3");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (waitForCurrentCommitCheckResult.get()) {
ctx.completeNow();
Expand Down Expand Up @@ -480,15 +491,16 @@ protected void onPartitionsRevokedBlocking(
consumer.setOnPartitionsRevokedHandler(s -> {
ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isTrue());
});
final Checkpoint commitCheckDone = ctx.checkpoint(1);
final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committed.get(TOPIC_PARTITION);
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(numTestRecords);
});
commitCheckDone.flag();
if (offsetAndMetadata.offset() == numTestRecords) {
commitCheckDone.flag();
}
});
// trigger a rebalance where the currently assigned partition is revoked
// (and then assigned again - otherwise its offset wouldn't be returned by mockConsumer.committed())
Expand Down Expand Up @@ -662,10 +674,12 @@ public void testConsumerCommitsInitialOffset(final VertxTestContext ctx) throws
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));

final VertxTestContext consumerStartedCtx = new VertxTestContext();
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.checkpoint(2);
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.laxCheckpoint(2);
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.setOnRebalanceDoneHandler(s -> consumerStartedCheckpoint.flag());
consumer.setOnRebalanceDoneHandler(s -> {
consumerStartedCheckpoint.flag();
});
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
vertx.getOrCreateContext().runOnContext(v -> {
consumer.start()
Expand Down
1 change: 1 addition & 0 deletions clients/kafka-common/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
</root>

<logger name="org.apache.kafka" level="ERROR"/>
<logger name="org.eclipse.hono" level="INFO"/>

</configuration>

0 comments on commit fb1c797

Please sign in to comment.