Skip to content

Commit

Permalink
refactor: switch manual delete phase to T1
Browse files Browse the repository at this point in the history
Let next phase to test topic T0 and avoid additional dependency between phases.
  • Loading branch information
jeqo committed May 20, 2024
1 parent 6686fef commit 2a3bcc9
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,45 +485,55 @@ private void validateReadOverBatchBorders() {
@Test
@Order(3)
void remoteManualDelete() throws Exception {
// Reduce records in half by manually deletion affecting only topic t0
final long newStartOffset = RECORDS_TO_PRODUCE / 2;

final List<RemoteSegment> remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments()
.get(t0p0);
final List<RemoteSegment> segmentsToBeDeleted = remoteSegmentsBefore.stream()
// Collect segments to be deleted for T1
final List<RemoteSegment> segmentsToBeDeleted = remoteLogMetadataTracker.remoteSegments()
.get(t1p0)
.stream()
.filter(rs -> rs.endOffset() < newStartOffset)
.collect(Collectors.toList());

adminClient.deleteRecords(Map.of(TP_0_0, RecordsToDelete.beforeOffset(newStartOffset)))
TopicPartition t1p0 = TP_1_0;

adminClient.deleteRecords(Map.of(t1p0, RecordsToDelete.beforeOffset(newStartOffset)))
.all().get(5, TimeUnit.SECONDS);

remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted);

try (final var consumer = new KafkaConsumer<>(Map.of(
"bootstrap.servers", kafka.getBootstrapServers(),
"auto.offset.reset", "earliest"
), new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
// Check reduced records available in partition
try (final var consumer = new KafkaConsumer<>(
Map.of(
"bootstrap.servers", kafka.getBootstrapServers()
),
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
)) {

// Check the beginning and end offsets.
final Map<TopicPartition, Long> startOffsets = consumer.beginningOffsets(List.of(TP_0_0));
assertThat(startOffsets).containsEntry(TP_0_0, newStartOffset);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(List.of(TP_0_0));
assertThat(endOffsets).containsEntry(TP_0_0, RECORDS_TO_PRODUCE + 1);
final Map<TopicPartition, Long> startOffsets = consumer.beginningOffsets(List.of(t1p0));
assertThat(startOffsets).containsEntry(t1p0, newStartOffset);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(List.of(t1p0));
assertThat(endOffsets).containsEntry(t1p0, RECORDS_TO_PRODUCE + 1);
// TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client

// TODO check segments deleted on the remote

// Check what we can now consume.
consumer.assign(List.of(TP_0_0));
consumer.seek(TP_0_0, 0);
final ConsumerRecord<byte[], byte[]> record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0);
consumer.assign(List.of(t1p0));
consumer.seekToBeginning(List.of(t1p0));
List<ConsumerRecord<byte[], byte[]>> records = consumer.poll(Duration.ofSeconds(5)).records(t1p0);
assertThat(records).isNotEmpty();
final ConsumerRecord<byte[], byte[]> record = records.get(0);
assertThat(record.offset()).isEqualTo(newStartOffset);
}
}

@Test
@Order(4)
void remoteCleanupDueToRetention() throws Exception {
LOG.info("Forcing cleanup by setting bytes retention to 1");
LOG.info("Forcing cleanup by setting bytes retention to 1 byte");

final var alterConfigs = List.of(
new AlterConfigOp(
Expand Down

0 comments on commit 2a3bcc9

Please sign in to comment.