Skip to content

Commit

Permalink
fix: e2e test to handle new retention logic
Browse files Browse the repository at this point in the history
3.7 has changed how local retention impacts the rotation of active segment.
  • Loading branch information
jeqo committed May 20, 2024
1 parent 2a3bcc9 commit c66e109
Showing 1 changed file with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ private void validateReadOverBatchBorders() {
@Test
@Order(3)
void remoteManualDelete() throws Exception {
// Reduce records in half by manually deletion affecting only topic t0
// Reduce records in half by manual deletion affecting only topic t1
final long newStartOffset = RECORDS_TO_PRODUCE / 2;

// Collect segments to be deleted for T1
Expand All @@ -495,7 +495,7 @@ void remoteManualDelete() throws Exception {
.filter(rs -> rs.endOffset() < newStartOffset)
.collect(Collectors.toList());

TopicPartition t1p0 = TP_1_0;
final TopicPartition t1p0 = TP_1_0;

adminClient.deleteRecords(Map.of(t1p0, RecordsToDelete.beforeOffset(newStartOffset)))
.all().get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -523,7 +523,7 @@ void remoteManualDelete() throws Exception {
// Check what we can now consume.
consumer.assign(List.of(t1p0));
consumer.seekToBeginning(List.of(t1p0));
List<ConsumerRecord<byte[], byte[]>> records = consumer.poll(Duration.ofSeconds(5)).records(t1p0);
final List<ConsumerRecord<byte[], byte[]>> records = consumer.poll(Duration.ofSeconds(5)).records(t1p0);
assertThat(records).isNotEmpty();
final ConsumerRecord<byte[], byte[]> record = records.get(0);
assertThat(record.offset()).isEqualTo(newStartOffset);
Expand All @@ -533,11 +533,14 @@ void remoteManualDelete() throws Exception {
@Test
@Order(4)
void remoteCleanupDueToRetention() throws Exception {
LOG.info("Forcing cleanup by setting bytes retention to 1 byte");
LOG.info("Forcing cleanup by setting bytes retention to 1 ms");

final var alterConfigs = List.of(
new AlterConfigOp(
new ConfigEntry("retention.bytes", "1"),
new ConfigEntry("retention.ms", "1"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(
new ConfigEntry("local.retention.ms", "1"),
AlterConfigOp.OpType.SET)
);
final Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
Expand All @@ -553,15 +556,12 @@ void remoteCleanupDueToRetention() throws Exception {
try (
// start a consumer to read from earliest
final var consumer = new KafkaConsumer<>(
Map.of(
"bootstrap.servers", kafka.getBootstrapServers(),
"auto.offset.reset", "earliest"
),
Map.of("bootstrap.servers", kafka.getBootstrapServers()),
new ByteArrayDeserializer(),
new ByteArrayDeserializer())
) {
// Get earliest offset available locally
final long newStartOffset = localLogFiles(TP_0_0).stream()
final long nextOffset = localLogFiles(TP_0_0).stream()
.mapToLong(f -> Long.parseLong(f.getName().replace(".log", "")))
.max()
.getAsLong();
Expand All @@ -571,8 +571,8 @@ void remoteCleanupDueToRetention() throws Exception {
.atMost(Duration.ofSeconds(30))
.until(() -> {
final var beginningOffset = consumer.beginningOffsets(List.of(TP_0_0)).get(TP_0_0);
LOG.info("Beginning offset found {}, expecting {}", beginningOffset, newStartOffset);
return beginningOffset.equals(newStartOffset);
LOG.info("Beginning offset found {}, expecting {}", beginningOffset, nextOffset);
return beginningOffset.equals(nextOffset);
});

final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(List.of(TP_0_0));
Expand All @@ -581,10 +581,11 @@ void remoteCleanupDueToRetention() throws Exception {
// TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client

consumer.assign(List.of(TP_0_0));
consumer.seek(TP_0_0, 0);
consumer.seekToBeginning(List.of(TP_0_0));

final ConsumerRecord<byte[], byte[]> record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0);
assertThat(record.offset()).isEqualTo(newStartOffset);
// with current retention logic, active segment is rotated, and no data is available locally.
final List<ConsumerRecord<byte[], byte[]>> records = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0);
assertThat(records).isEmpty();

// Collect all remote segments, as after changing retention, all should be deleted.
final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments();
Expand Down

0 comments on commit c66e109

Please sign in to comment.