Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport PR #15233 to 8.9: Fix DeadLetterQueueWriter unable to finalize segment error #15241

Merged
merged 1 commit into from Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -108,22 +108,22 @@ public String toString() {
private final long maxSegmentSize;
private final long maxQueueSize;
private final QueueStorageType storageType;
private AtomicLong currentQueueSize;
private final AtomicLong currentQueueSize;
private final Path queuePath;
private final FileLock fileLock;
private volatile RecordIOWriter currentWriter;
private int currentSegmentIndex;
private Timestamp lastEntryTimestamp;
private Duration flushInterval;
private volatile int currentSegmentIndex;
private volatile Timestamp lastEntryTimestamp;
private final Duration flushInterval;
private Instant lastWrite;
private final AtomicBoolean open = new AtomicBoolean(true);
private ScheduledExecutorService flushScheduler;
private final LongAdder droppedEvents = new LongAdder();
private final LongAdder expiredEvents = new LongAdder();
private String lastError = "no errors";
private volatile String lastError = "no errors";
private final Clock clock;
private Optional<Timestamp> oldestSegmentTimestamp;
private Optional<Path> oldestSegmentPath = Optional.empty();
private volatile Optional<Timestamp> oldestSegmentTimestamp;
private volatile Optional<Path> oldestSegmentPath = Optional.empty();
private final TemporalAmount retentionTime;

public static final class Builder {
Expand Down Expand Up @@ -405,7 +405,8 @@ private long deleteTailSegment(Path segment, String motivation) throws IOExcepti
}
}

private void updateOldestSegmentReference() throws IOException {
// package-private for testing
void updateOldestSegmentReference() throws IOException {
final Optional<Path> previousOldestSegmentPath = oldestSegmentPath;
oldestSegmentPath = listSegmentPaths(this.queuePath)
.filter(p -> p.toFile().length() > 1) // take the files that have content to process
Expand Down Expand Up @@ -433,15 +434,19 @@ private void updateOldestSegmentReference() throws IOException {
oldestSegmentTimestamp = foundTimestamp;
}

// package-private for testing
Optional<Path> getOldestSegmentPath() {
return oldestSegmentPath;
}

/**
* Extract the timestamp from the last DLQEntry it finds in the given segment.
* Start from the end of the latest block, and going backward try to read the next event from its start.
* */
private static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) throws IOException {
final int lastBlockId = (int) Math.ceil(((Files.size(segmentPath) - VERSION_SIZE) / (double) BLOCK_SIZE)) - 1;
static Optional<Timestamp> readTimestampOfLastEventInSegment(Path segmentPath) throws IOException {
byte[] eventBytes = null;
try (RecordIOReader recordReader = new RecordIOReader(segmentPath)) {
int blockId = lastBlockId;
int blockId = (int) Math.ceil(((Files.size(segmentPath) - VERSION_SIZE) / (double) BLOCK_SIZE)) - 1;;
while (eventBytes == null && blockId >= 0) { // no event present in last block, try with the one before
recordReader.seekToBlock(blockId);
eventBytes = recordReader.readEvent();
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -42,6 +43,7 @@
import org.logstash.DLQEntry;
import org.logstash.Event;
import org.logstash.LockException;
import org.logstash.Timestamp;

import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -345,6 +347,91 @@ public void testRemoveSegmentsOrder() throws IOException {
}
}

@Test
public void testUpdateOldestSegmentReference() throws IOException {
try (DeadLetterQueueWriter sut = DeadLetterQueueWriter
.newBuilderWithoutFlusher(dir, 10 * MB, 20 * MB)
.build()) {

final byte[] eventBytes = new DLQEntry(new Event(), "", "", "").serialize();

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("1.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("2.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("3.log"))){
writer.writeEvent(eventBytes);
}

// Exercise
sut.updateOldestSegmentReference();

// Verify
final Optional<Path> oldestSegmentPath = sut.getOldestSegmentPath();
assertTrue(oldestSegmentPath.isPresent());
assertEquals("1.log", oldestSegmentPath.get().getFileName().toString());
}
}

@Test
public void testUpdateOldestSegmentReferenceWithDeletedSegment() throws IOException {
try (DeadLetterQueueWriter sut = DeadLetterQueueWriter
.newBuilderWithoutFlusher(dir, 10 * MB, 20 * MB)
.build()) {

final byte[] eventBytes = new DLQEntry(new Event(), "", "", "").serialize();
try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("1.log"))){
writer.writeEvent(eventBytes);
}

try(RecordIOWriter writer = new RecordIOWriter(dir.resolve("2.log"))){
writer.writeEvent(eventBytes);
}

// Exercise
sut.updateOldestSegmentReference();

// Delete 1.log (oldest)
Files.delete(sut.getOldestSegmentPath().get());

sut.updateOldestSegmentReference();

// Verify
assertEquals("2.log",sut.getOldestSegmentPath().get().getFileName().toString());
}
}

@Test
public void testReadTimestampOfLastEventInSegment() throws IOException {
final Timestamp expectedTimestamp = Timestamp.now();
final byte[] eventBytes = new DLQEntry(new Event(), "", "", "", expectedTimestamp).serialize();

final Path segmentPath = dir.resolve("1.log");
try (RecordIOWriter writer = new RecordIOWriter(segmentPath)) {
writer.writeEvent(eventBytes);
}

// Exercise
Optional<Timestamp> timestamp = DeadLetterQueueWriter.readTimestampOfLastEventInSegment(segmentPath);

// Verify
assertTrue(timestamp.isPresent());
assertEquals(expectedTimestamp, timestamp.get());
}

@Test
public void testReadTimestampOfLastEventInSegmentWithDeletedSegment() throws IOException {
// Exercise
Optional<Timestamp> timestamp = DeadLetterQueueWriter.readTimestampOfLastEventInSegment(Path.of("non_existing_file.txt"));

// Verify
assertTrue(timestamp.isEmpty());
}

@Test
public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, InterruptedException {
Event blockAlmostFullEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
Expand Down