Skip to content

Commit

Permalink
fix(buffers): deadlock when seeking after entire write fails to be fl…
Browse files Browse the repository at this point in the history
…ushed (vectordotdev#17657)

## Context

In vectordotdev#17644, a user reported disk buffers getting stuck in an infinite
loop, and thus deadlocking, when restarting after a crash. They provided
some very useful debug information, going as far to evaluate, add some
logging, and get some values of internal state for the reader.

When a disk buffer is initialized -- either for the first time or after
Vector is restarted and the buffer must resume where it left off -- both
the reader and writer perform a catch-up phase. For the writer, it
checks the current data file and tries to figure out if the last record
written matches where it believes it left off. For the reader, it
actually has to dynamically seek to where it left off within the the
given data file, since we can't just open the file and start from the
beginning: data files are append-only.

As part of the seek logic, there's a loop where we just call
`Reader::next` until we read the record we supposedly left off on, and
then we know we're caught up. This loop only breaks on two conditions:

- `self.last_reader_record_id < ledger_last`, which implies we haven't
yet read the last record we left off on (otherwise it would be equal to
`ledger_last`)
- `maybe_record.is_none() && self.last_reader_record_id == 0`, which
would tell us that we reached EOF on the data file (no more records) but
nothing was in the file (`last_reader_record_id` still being 0)

While the first conditional is correct, the second one is not. The user
that originally reported the issue [said as
much](vectordotdev#17644 (comment)),
but dropping the `&& self.last_reader_record_id == 0` fixes the issue.

In this case, there can exist a scenario where Vector crashes and writes
that the reader had read and acknowledged never actually make it to
disk. Both the reader/writer are able to outpace the data on disk
because the reader can read yet-to-be-flushed records since they exist
as dirty pages in the page cache.

When this happens, the reader may have indicated to the ledger that it,
for example, has read up to record ID 10 while the last record _on disk_
when Vector starts up is record ID 5. When the seek logic runs, it knows
the last read record ID was 10. It will do some number of reads while
seeking, eventually reading record ID 5, and updating
`self.last_reader_record_id` accordingly. On the next iteration of the
loop, it tries to read but hits EOF: the data file indeed has nothing
left. However, `self.last_reader_record_id < ledger_last` is still true
while `maybe_record.is_none() && self.last_reader_record_id == 0` is
not, as `self.last_reader_record_id` is set to `5`.

Alas, deadlock.

## Solution

The solution is painfully simple, and the user that originally reported
the issue [said as
much](vectordotdev#17644 (comment)):
drop `&& self.last_reader_record_id == 0`.

Given the loop's own condition, the inner check for
`self.last_reader_record_id == 0` was redundant... but obviously also
logically incorrect, too, in the case where we had missing writes. I'm
still not entirely sure how existing tests didn't already catch this,
but it was easy enough to spot the error once I knew where to look, and
the resulting unit test I added convincingly showed that it was broken,
and after making the change, indeed fixed.

## Reviewer Note(s)

I added two unit tests: one for the fix as shown and one for what I
thought was another bug. Turns out that the "other bug" wasn't a bug,
and this unit test isn't _explicitly_ required, but it's a simple
variation of other tests with a more straightforward invariant that it
tries to demonstrate, so I just left it in.

Fixes vectordotdev#17644.
  • Loading branch information
tobz committed Jun 13, 2023
1 parent e1b3357 commit 37a662a
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 7 deletions.
8 changes: 2 additions & 6 deletions lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,12 +901,8 @@ where
while self.last_reader_record_id < ledger_last {
match self.next().await {
Ok(maybe_record) => {
if maybe_record.is_none() && self.last_reader_record_id == 0 {
// We've hit a point where there's no more data to read. If our "last reader record
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
// up, so we just pin ourselves to where the ledger says we left off, and we're good
// to go.
self.last_reader_record_id = ledger_last;
if maybe_record.is_none() {
// We've hit the end of the current data file so we've gone as far as we can.
break;
}
}
Expand Down
95 changes: 95 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,98 @@ async fn reader_doesnt_block_from_partial_write_on_last_record() {
let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record");
fut.instrument(parent.or_current()).await;
}

#[tokio::test]
async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() {
// When initializing, the reader will be catching up to the last record it read, which involves
// reading individual records in the current reader data file until a record is returned whose
// record ID matches the "last record ID read" field from the ledger.
//
// If the current data file contains a valid last record when we initialize, but that last
// record is _behind_ the last record read as tracked by the ledger, then we need to ensure we
// can break out of the catch-up loop when we get to the end of the current data file.
//
// Our existing logic for corrupted event detection, and the writer's own initialization logic,
// will emit an error message when we realize that data is missing based on record ID gaps.
let _a = install_tracing_helpers();

let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create a regular buffer, no customizations required.
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;

// Write two records, and then read and acknowledge both.
//
// This puts the buffer into a state where there's data in the current data file, and
// the ledger has a non-zero record ID for where it thinks the reader needs to be. This
// ensures that the reader actually does at least two calls to `Reader::next` during
// `Reader::seek_to_next_record`, which is necessary to ensure that the reader leaves
// the default state of `self.last_reader_record_id == 0`.
let first_bytes_written = writer
.write_record(SizedRecord::new(64))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

let second_bytes_written = writer
.write_record(SizedRecord::new(68))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

writer.close();

let first_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(64), first_read);
acknowledge(first_read).await;

let second_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(68), second_read);
acknowledge(second_read).await;

let third_read = reader.next().await.expect("should not fail to read record");
assert!(third_read.is_none());

ledger.flush().expect("should not fail to flush ledger");

// Grab the current writer data file path before dropping the buffer.
let data_file_path = ledger.get_current_writer_data_file_path();
drop(reader);
drop(writer);
drop(ledger);

// Open the data file and truncate the second record. This will ensure that the reader
// hits EOF after the first read, which we need to do in order to exercise the logic
// that breaks out of the loop.
let initial_len = first_bytes_written as u64 + second_bytes_written as u64;
let target_len = first_bytes_written as u64;
set_file_length(&data_file_path, initial_len, target_len)
.await
.expect("should not fail to truncate data file");

// Now reopen the buffer, which should complete in a timely fashion without an immediate error.
let reopen = timeout(
Duration::from_millis(500),
create_default_buffer_v2::<_, SizedRecord>(data_dir),
)
.await;
assert!(
reopen.is_ok(),
"failed to reopen buffer in a timely fashion; likely deadlock"
);
}
});

let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file");
fut.instrument(parent.or_current()).await;
}
110 changes: 109 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use tracing::Instrument;
use super::{create_buffer_v2_with_max_data_file_size, read_next, read_next_some};
use crate::{
assert_buffer_is_empty, assert_buffer_records, assert_buffer_size, assert_enough_bytes_written,
assert_reader_writer_v2_file_positions, await_timeout, set_data_file_length,
assert_reader_last_writer_next_positions, assert_reader_writer_v2_file_positions,
await_timeout, set_data_file_length,
test::{acknowledge, install_tracing_helpers, with_temp_dir, MultiEventRecord, SizedRecord},
variants::disk_v2::{
common::{DEFAULT_FLUSH_INTERVAL, MAX_FILE_ID},
Expand Down Expand Up @@ -820,3 +821,110 @@ async fn writer_updates_ledger_when_buffered_writer_reports_implicit_flush() {
})
.await;
}

#[tokio::test]
async fn reader_writer_positions_aligned_through_multiple_files_and_records() {
// This test ensures that the reader/writer position stay aligned through multiple records and
// data files. This is to say, that, if we write 5 records, each with 10 events, and then read
// and acknowledge all of those events... the writer's next record ID should be 51 (the 50th
// event would correspond to ID 50, so next ID would be 51) and the reader's last read record ID
// should be 50.
//
// Testing this across multiple data files isn't super germane to the position logic, but it
// just ensures we're also testing that aspect.

let _a = install_tracing_helpers();
let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create our buffer with an arbitrarily low maximum data file size. We'll use this to
// control how many records make it into a given data file. Just another way to ensure
// we're testing the position logic with multiple writes to one data file, one write to
// a data file, etc.
let (mut writer, mut reader, ledger) =
create_buffer_v2_with_max_data_file_size(data_dir, 256).await;

// We'll write multi-event records with N events based on these sizes, and as we do so,
// we'll assert that our writer position moves as expected after the write, and that
// after reading and acknowledging, the reader position also moves as expected.
let record_sizes = &[176, 52, 91, 137, 54, 87];

let mut expected_writer_position = ledger.state().get_next_writer_record_id();
let mut expected_reader_position = ledger.state().get_last_reader_record_id();
let mut trailing_reader_position_delta = 0;

for record_size in record_sizes {
// Initial check before writing/reading the next record.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record = MultiEventRecord::new(*record_size);
assert_eq!(
record.event_count(),
usize::try_from(*record_size).unwrap_or(usize::MAX)
);

writer
.write_record(record)
.await
.expect("write should not fail");
writer.flush().await.expect("flush should not fail");

expected_writer_position += u64::from(*record_size);

// Make sure the writer position advanced after flushing.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record_via_read = read_next_some(&mut reader).await;
assert_eq!(record_via_read, MultiEventRecord::new(*record_size));
acknowledge(record_via_read).await;

// Increment the expected reader position by the trailing reader position delta, and
// then now that we've done a read, we should be able to have seen actually move
// forward.
expected_reader_position += trailing_reader_position_delta;
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

// Set the trailing reader position delta to the record we just read.
//
// We do it this way because reads themselves have to drive acknowledgement logic to
// then drive updates to the ledger, so we will only see the change in the reader's
// position the _next_ time we do a read.
trailing_reader_position_delta = u64::from(*record_size);
}

// Close the writer and do a final read, thus driving the acknowledgement logic, and
// position update logic, before we do our final position check.
writer.close();
assert_eq!(reader.next().await, Ok(None));

// Calculate the absolute reader/writer positions we would expect based on all of the
// records/events written and read. This is to double check our work and make sure that
// the "expected" positions didn't hide any bugs from us.
let expected_final_reader_position =
record_sizes.iter().copied().map(u64::from).sum::<u64>();
let expected_final_writer_position = expected_final_reader_position + 1;

assert_reader_last_writer_next_positions!(
ledger,
expected_final_reader_position,
expected_final_writer_position
);
}
});

let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records");
fut.instrument(parent.or_current()).await;
}
18 changes: 18 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ macro_rules! assert_reader_writer_v2_file_positions {
}};
}

#[macro_export]
macro_rules! assert_reader_last_writer_next_positions {
($ledger:expr, $reader_expected:expr, $writer_expected:expr) => {{
let reader_actual = $ledger.state().get_last_reader_record_id();
let writer_actual = $ledger.state().get_next_writer_record_id();
assert_eq!(
$reader_expected, reader_actual,
"expected reader last read record ID of {}, got {} instead",
$reader_expected, reader_actual,
);
assert_eq!(
$writer_expected, writer_actual,
"expected writer next record ID of {}, got {} instead",
$writer_expected, writer_actual,
);
}};
}

#[macro_export]
macro_rules! assert_enough_bytes_written {
($written:expr, $record_type:ty, $record_payload_size:expr) => {
Expand Down

0 comments on commit 37a662a

Please sign in to comment.