From c66e109a298726bfe93e5546a0d0b92779faee0f Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 10 May 2024 17:33:59 +0300 Subject: [PATCH] fix: e2e test to handle new retention logic 3.7 has changed how local retention impacts the rotation of active segment. --- .../tieredstorage/e2e/SingleBrokerTest.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java index a5f6c018d..97741d388 100644 --- a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java @@ -485,7 +485,7 @@ private void validateReadOverBatchBorders() { @Test @Order(3) void remoteManualDelete() throws Exception { - // Reduce records in half by manually deletion affecting only topic t0 + // Reduce records in half by manual deletion affecting only topic t1 final long newStartOffset = RECORDS_TO_PRODUCE / 2; // Collect segments to be deleted for T1 @@ -495,7 +495,7 @@ void remoteManualDelete() throws Exception { .filter(rs -> rs.endOffset() < newStartOffset) .collect(Collectors.toList()); - TopicPartition t1p0 = TP_1_0; + final TopicPartition t1p0 = TP_1_0; adminClient.deleteRecords(Map.of(t1p0, RecordsToDelete.beforeOffset(newStartOffset))) .all().get(5, TimeUnit.SECONDS); @@ -523,7 +523,7 @@ void remoteManualDelete() throws Exception { // Check what we can now consume. consumer.assign(List.of(t1p0)); consumer.seekToBeginning(List.of(t1p0)); - List> records = consumer.poll(Duration.ofSeconds(5)).records(t1p0); + final List> records = consumer.poll(Duration.ofSeconds(5)).records(t1p0); assertThat(records).isNotEmpty(); final ConsumerRecord record = records.get(0); assertThat(record.offset()).isEqualTo(newStartOffset); @@ -533,11 +533,14 @@ void remoteManualDelete() throws Exception { @Test @Order(4) void remoteCleanupDueToRetention() throws Exception { - LOG.info("Forcing cleanup by setting bytes retention to 1 byte"); + LOG.info("Forcing cleanup by setting bytes retention to 1 ms"); final var alterConfigs = List.of( new AlterConfigOp( - new ConfigEntry("retention.bytes", "1"), + new ConfigEntry("retention.ms", "1"), + AlterConfigOp.OpType.SET), + new AlterConfigOp( + new ConfigEntry("local.retention.ms", "1"), AlterConfigOp.OpType.SET) ); final Map> configs = Map.of( @@ -553,15 +556,12 @@ void remoteCleanupDueToRetention() throws Exception { try ( // start a consumer to read from earliest final var consumer = new KafkaConsumer<>( - Map.of( - "bootstrap.servers", kafka.getBootstrapServers(), - "auto.offset.reset", "earliest" - ), + Map.of("bootstrap.servers", kafka.getBootstrapServers()), new ByteArrayDeserializer(), new ByteArrayDeserializer()) ) { // Get earliest offset available locally - final long newStartOffset = localLogFiles(TP_0_0).stream() + final long nextOffset = localLogFiles(TP_0_0).stream() .mapToLong(f -> Long.parseLong(f.getName().replace(".log", ""))) .max() .getAsLong(); @@ -571,8 +571,8 @@ void remoteCleanupDueToRetention() throws Exception { .atMost(Duration.ofSeconds(30)) .until(() -> { final var beginningOffset = consumer.beginningOffsets(List.of(TP_0_0)).get(TP_0_0); - LOG.info("Beginning offset found {}, expecting {}", beginningOffset, newStartOffset); - return beginningOffset.equals(newStartOffset); + LOG.info("Beginning offset found {}, expecting {}", beginningOffset, nextOffset); + return beginningOffset.equals(nextOffset); }); final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); @@ -581,10 +581,11 @@ void remoteCleanupDueToRetention() throws Exception { // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client consumer.assign(List.of(TP_0_0)); - consumer.seek(TP_0_0, 0); + consumer.seekToBeginning(List.of(TP_0_0)); - final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); - assertThat(record.offset()).isEqualTo(newStartOffset); + // with current retention logic, active segment is rotated, and no data is available locally. + final List> records = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0); + assertThat(records).isEmpty(); // Collect all remote segments, as after changing retention, all should be deleted. final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments();