Skip to content

Commit

Permalink
Merge #1270: Improve performance of bdk_file_store::EntryIter
Browse files Browse the repository at this point in the history
51bd01b fix(file_store): recover file offset after read (志宇)
66dc34e refactor(file_store): Use BufReader but simplify (LLFourn)
c871764 test(file_store): `last_write_is_short` (志宇)
a3aa8b6 feat(file_store)!: optimize `EntryIter` by reducing syscalls (志宇)

Pull request description:

  ### Description

  `EntryIter` performance is improved by reducing syscalls. The underlying file reader is wrapped with `BufReader` (to reduce calls to `read` and `seek`).

  Two new tests are introduced. One ensures correct behavior when the last changeset write is too short. The other ensures the next write position is correct after a short read.

  ### Notes to the reviewers

  This is extracted from #1172 as suggested by #1172 (review).

  ### Changelog notice

  Changed
  * `EntryIter` performance is improved by reducing syscalls.

  ### Checklists

  #### All Submissions:

  * [x] I've signed all my commits
  * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md)
  * [x] I ran `cargo fmt` and `cargo clippy` before committing

  #### New Features:

  * [x] I've added tests for the new feature
  * [x] I've added docs for the new feature

ACKs for top commit:
  LLFourn:
    ACK 51bd01b

Tree-SHA512: 9c25f9f2032cb2d551f3fe4ac62b856ceeb69a388f037b34674af366c55629a2eaa2b90b1ae4fbd425415ea8d02f44493a6c643b4b1a57f4507e87aa7ade3736
  • Loading branch information
evanlinjin committed Jan 25, 2024
2 parents 48b28e3 + 51bd01b commit 07116df
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 34 deletions.
70 changes: 39 additions & 31 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bincode::Options;
use std::{
fs::File,
io::{self, Seek},
io::{self, BufReader, Seek},
marker::PhantomData,
};

Expand All @@ -14,8 +14,9 @@ use crate::bincode_options;
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
db_file: Option<&'t mut File>,

/// Buffered reader around the file
db_file: BufReader<&'t mut File>,
finished: bool,
/// The file position for the first read of `db_file`.
start_pos: Option<u64>,
types: PhantomData<T>,
Expand All @@ -24,8 +25,9 @@ pub struct EntryIter<'t, T> {
impl<'t, T> EntryIter<'t, T> {
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
Self {
db_file: Some(db_file),
db_file: BufReader::new(db_file),
start_pos: Some(start_pos),
finished: false,
types: PhantomData,
}
}
Expand All @@ -38,44 +40,44 @@ where
type Item = Result<T, IterError>;

fn next(&mut self) -> Option<Self::Item> {
// closure which reads a single entry starting from `self.pos`
let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
let pos = match start_pos {
Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
None => f.stream_position()?,
};
if self.finished {
return None;
}
(|| {
if let Some(start) = self.start_pos.take() {
self.db_file.seek(io::SeekFrom::Start(start))?;
}

match bincode_options().deserialize_from(&*f) {
Ok(changeset) => {
f.stream_position()?;
Ok(Some(changeset))
}
let pos_before_read = self.db_file.stream_position()?;
match bincode_options().deserialize_from(&mut self.db_file) {
Ok(changeset) => Ok(Some(changeset)),
Err(e) => {
self.finished = true;
let pos_after_read = self.db_file.stream_position()?;
// allow unexpected EOF if 0 bytes were read
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof {
let eof = f.seek(io::SeekFrom::End(0))?;
if pos == eof {
return Ok(None);
}
if inner.kind() == io::ErrorKind::UnexpectedEof
&& pos_after_read == pos_before_read
{
return Ok(None);
}
}
f.seek(io::SeekFrom::Start(pos))?;
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
Err(IterError::Bincode(*e))
}
}
};

let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
if result.is_err() {
self.db_file = None;
}
result.transpose()
})()
.transpose()
}
}

impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
impl<'t, T> Drop for EntryIter<'t, T> {
fn drop(&mut self) {
// This syncs the underlying file's offset with the buffer's position. This way, we
// maintain the correct position to start the next read/write.
if let Ok(pos) = self.db_file.stream_position() {
let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
}
}
}

Expand All @@ -97,4 +99,10 @@ impl core::fmt::Display for IterError {
}
}

impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
}
}

impl std::error::Error for IterError {}
122 changes: 119 additions & 3 deletions crates/file_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ mod test {

use bincode::DefaultOptions;
use std::{
collections::BTreeSet,
io::{Read, Write},
vec::Vec,
};
Expand All @@ -228,7 +229,7 @@ mod test {
const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
[98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];

type TestChangeSet = Vec<String>;
type TestChangeSet = BTreeSet<String>;

#[derive(Debug)]
struct TestTracker;
Expand All @@ -253,7 +254,7 @@ mod test {
fn open_or_create_new() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("db_file");
let changeset = vec!["hello".to_string(), "world".to_string()];
let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);

{
let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
Expand Down Expand Up @@ -304,7 +305,7 @@ mod test {
let mut data = [255_u8; 2000];
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);

let changeset = vec!["one".into(), "two".into(), "three!".into()];
let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);

let mut file = NamedTempFile::new().unwrap();
file.write_all(&data).expect("should write");
Expand Down Expand Up @@ -340,4 +341,119 @@ mod test {

assert_eq!(got_bytes, expected_bytes);
}

#[test]
fn last_write_is_short() {
let temp_dir = tempfile::tempdir().unwrap();

let changesets = [
TestChangeSet::from(["1".into()]),
TestChangeSet::from(["2".into(), "3".into()]),
TestChangeSet::from(["4".into(), "5".into(), "6".into()]),
];
let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]);
let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap();

for short_write_len in 1..last_changeset_bytes.len() - 1 {
let file_path = temp_dir.path().join(format!("{}.dat", short_write_len));
println!("Test file: {:?}", file_path);

// simulate creating a file, writing data where the last write is incomplete
{
let mut db =
Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
for changeset in &changesets {
db.append_changeset(changeset).unwrap();
}
// this is the incomplete write
db.db_file
.write_all(&last_changeset_bytes[..short_write_len])
.unwrap();
}

// load file again and aggregate changesets
// write the last changeset again (this time it succeeds)
{
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
let err = db
.aggregate_changesets()
.expect_err("should return error as last read is short");
assert_eq!(
err.changeset,
changesets.iter().cloned().reduce(|mut acc, cs| {
Append::append(&mut acc, cs);
acc
}),
"should recover all changesets that are written in full",
);
db.db_file.write_all(&last_changeset_bytes).unwrap();
}

// load file again - this time we should successfully aggregate all changesets
{
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
let aggregated_changesets = db
.aggregate_changesets()
.expect("aggregating all changesets should succeed");
assert_eq!(
aggregated_changesets,
changesets
.iter()
.cloned()
.chain(core::iter::once(last_changeset.clone()))
.reduce(|mut acc, cs| {
Append::append(&mut acc, cs);
acc
}),
"should recover all changesets",
);
}
}
}

#[test]
fn write_after_short_read() {
let temp_dir = tempfile::tempdir().unwrap();

let changesets = (0..20)
.map(|n| TestChangeSet::from([format!("{}", n)]))
.collect::<Vec<_>>();
let last_changeset = TestChangeSet::from(["last".into()]);

for read_count in 0..changesets.len() {
let file_path = temp_dir.path().join(format!("{}.dat", read_count));
println!("Test file: {:?}", file_path);

// First, we create the file with all the changesets!
let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
for changeset in &changesets {
db.append_changeset(changeset).unwrap();
}
drop(db);

// We re-open the file and read `read_count` number of changesets.
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
let mut exp_aggregation = db
.iter_changesets()
.take(read_count)
.map(|r| r.expect("must read valid changeset"))
.fold(TestChangeSet::default(), |mut acc, v| {
Append::append(&mut acc, v);
acc
});
// We write after a short read.
db.write_changes(&last_changeset)
.expect("last write must succeed");
Append::append(&mut exp_aggregation, last_changeset.clone());
drop(db);

// We open the file again and check whether aggregate changeset is expected.
let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
.unwrap()
.aggregate_changesets()
.expect("must aggregate changesets")
.unwrap_or_default();
assert_eq!(aggregation, exp_aggregation);
}
}
}

0 comments on commit 07116df

Please sign in to comment.