Skip to content

Commit

Permalink
Fix DeadLetterQueueWriter unable to finalize segment error (#15233) (#…
Browse files Browse the repository at this point in the history
…15241)

This commit moves the Files.size(...) call into the try catch block, that way, when the oldest segment is deleted by the DeadLetterQueueReader, no NoSuchFileException will be thrown up, and the writer will gracefully update the oldest segment on the next updateOldestSegmentReference invocation (scheduled flush, entry write, delete expired, etc).
It also adds the volatile keyword to the shared mutable variables, making sure that all the changes will be instantly visible among all the running threads (scheduler & writer).

(cherry picked from commit 88f42f3)

Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and edmocosta committed Aug 8, 2023
1 parent 36c75c1 commit de4241a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 11 deletions.
Expand Up @@ -108,22 +108,22 @@ public String toString() {
private final long maxSegmentSize;
private final long maxQueueSize;
private final QueueStorageType storageType;
private AtomicLong currentQueueSize;
private final AtomicLong currentQueueSize;
private final Path queuePath;
private final FileLock fileLock;
private volatile RecordIOWriter currentWriter;
private int currentSegmentIndex;
private Timestamp lastEntryTimestamp;
private Duration flushInterval;
private volatile int currentSegmentIndex;
private volatile Timestamp lastEntryTimestamp;
private final Duration flushInterval;
private Instant lastWrite;
private final AtomicBoolean open = new AtomicBoolean(true);
private ScheduledExecutorService flushScheduler;
private final LongAdder droppedEvents = new LongAdder();
private final LongAdder expiredEvents = new LongAdder();
private String lastError = "no errors";
private volatile String lastError = "no errors";
private final Clock clock;
private Optional<Timestamp> oldestSegmentTimestamp;
private Optional<Path> oldestSegmentPath = Optional.empty();
private volatile Optional<Timestamp> oldestSegmentTimestamp;
private volatile Optional<Path> oldestSegmentPath = Optional.empty();
private final TemporalAmount retentionTime;

public static final class Builder {
Expand Down Expand Up @@ -405,7 +405,8 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti
}
}

private void updateOldestSegmentReference() throws IOException {
// package-private for testing
void updateOldestSegmentReference() throws IOException {
final Optional<Path> previousOldestSegmentPath = oldestSegmentPath;
oldestSegmentPath = listSegmentPaths(this.queuePath)
.filter(p -> p.toFile().length() > 1) // take the files that have content to process
Expand Down Expand Up @@ -433,15 +434,19 @@ private void updateOldestSegmentReference() throws IOException {
oldestSegmentTimestamp = foundTimestamp;
}

// package-private for testing
Optional<Path> getOldestSegmentPath() {
return oldestSegmentPath;
}

/**
* Extract the timestamp from the last DLQEntry it finds in the given segment.
* Start from the end of the latest block, and going backward try to read the next event from its start.
* */
private static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) throws IOException {
final int lastBlockId = (int) Math.ceil(((Files.size(segmentPath) - VERSION_SIZE) / (double) BLOCK_SIZE)) - 1;
static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) throws IOException {
byte[] eventBytes = null;
try (RecordIOReader recordReader = new RecordIOReader(segmentPath)) {
int blockId = lastBlockId;
int blockId = (int) Math.ceil(((Files.size(segmentPath) - VERSION_SIZE) / (double) BLOCK_SIZE)) - 1;;
while (eventBytes == null && blockId >= 0) { // no event present in last block, try with the one before
recordReader.seekToBlock(blockId);
eventBytes = recordReader.readEvent();
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -42,6 +43,7 @@
import org.logstash.DLQEntry;
import org.logstash.Event;
import org.logstash.LockException;
import org.logstash.Timestamp;

import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -345,6 +347,91 @@ public void testRemoveSegmentsOrder() throws IOException {
}
}

@Test
public void testUpdateOldestSegmentReference() throws IOException {
try (DeadLetterQueueWriter sut = DeadLetterQueueWriter
.newBuilderWithoutFlusher(dir, 10 * MB, 20 * MB)
.build()) {

final byte[] eventBytes = new DLQEntry(new Event(), "", "", "").serialize();

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("1.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("2.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("3.log"))){
writer.writeEvent(eventBytes);
}

// Exercise
sut.updateOldestSegmentReference();

// Verify
final Optional<Path> oldestSegmentPath = sut.getOldestSegmentPath();
assertTrue(oldestSegmentPath.isPresent());
assertEquals("1.log", oldestSegmentPath.get().getFileName().toString());
}
}

@Test
public void testUpdateOldestSegmentReferenceWithDeletedSegment() throws IOException {
try (DeadLetterQueueWriter sut = DeadLetterQueueWriter
.newBuilderWithoutFlusher(dir, 10 * MB, 20 * MB)
.build()) {

final byte[] eventBytes = new DLQEntry(new Event(), "", "", "").serialize();
try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("1.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("2.log"))){
writer.writeEvent(eventBytes);
}

// Exercise
sut.updateOldestSegmentReference();

// Delete 1.log (oldest)
Files.delete(sut.getOldestSegmentPath().get());

sut.updateOldestSegmentReference();

// Verify
assertEquals("2.log",sut.getOldestSegmentPath().get().getFileName().toString());
}
}

@Test
public void testReadTimestampOfLastEventInSegment() throws IOException {
final Timestamp expectedTimestamp = Timestamp.now();
final byte[] eventBytes = new DLQEntry(new Event(), "", "", "", expectedTimestamp).serialize();

final Path segmentPath = dir.resolve("1.log");
try (RecordIOWriter writer = new RecordIOWriter(segmentPath)) {
writer.writeEvent(eventBytes);
}

// Exercise
Optional<Timestamp> timestamp = DeadLetterQueueWriter.readTimestampOfLastEventInSegment(segmentPath);

// Verify
assertTrue(timestamp.isPresent());
assertEquals(expectedTimestamp, timestamp.get());
}

@Test
public void testReadTimestampOfLastEventInSegmentWithDeletedSegment() throws IOException {
// Exercise
Optional<Timestamp> timestamp = DeadLetterQueueWriter.readTimestampOfLastEventInSegment(Path.of("non_existing_file.txt"));

// Verify
assertTrue(timestamp.isEmpty());
}

@Test
public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, InterruptedException {
Event blockAlmostFullEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
Expand Down

0 comments on commit de4241a

Please sign in to comment.