Skip to content

Commit

Permalink
Fix check_integrity() to return Ok(true) when no repair was performed
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Mar 17, 2024
1 parent b691651 commit 2eaf1cf
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 30 deletions.
19 changes: 14 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,24 +341,33 @@ impl Database {
Ok(true)
}

/// Check the integrity of the database file, and repair it if possible.
/// Force a check of the integrity of the database file, and repair it if possible.
///
/// Note: Calling this function is unnecessary during normal operation. redb will automatically
/// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
/// quite slow and should only be used when you suspect the database file may have been modified
/// externally to redb, or that a redb bug may have left the database in a corrupted state.
///
/// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
/// and `Err(Corrupted)` if the check failed and the file could not be repaired
pub fn check_integrity(&mut self) -> Result<bool> {
self.mem.clear_cache_and_reload()?;
let allocator_hash = self.mem.allocator_hash();
let mut was_clean = self.mem.clear_cache_and_reload()?;

if !self.mem.needs_repair()? && Self::verify_primary_checksums(&self.mem)? {
return Ok(true);
if !Self::verify_primary_checksums(&self.mem)? {
was_clean = false;
}

Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
DatabaseError::Storage(storage_err) => storage_err,
_ => unreachable!(),
})?;
if allocator_hash != self.mem.allocator_hash() {
was_clean = false;
}
self.mem.begin_writable()?;

Ok(false)
Ok(was_clean)
}

/// Compacts the database file
Expand Down
60 changes: 35 additions & 25 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,21 @@ impl TransactionalMemory {
self.storage.invalidate_cache_all()
}

pub(crate) fn clear_cache_and_reload(&mut self) -> Result {
pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool> {
assert!(self.allocated_since_commit.lock().unwrap().is_empty());

self.storage.flush(false)?;
self.storage.invalidate_cache_all();

let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes);
// TODO: should probably consolidate this logic with Self::new()
// TODO: This ends up always being true because this is called from check_integrity() once the db is already open
// TODO: Also we should recheck the layout
let mut was_clean = true;
if header.recovery_required {
let layout = header.layout();
let region_max_pages = layout.full_region_layout().num_pages();
let region_header_pages = layout.full_region_layout().get_header_pages();
header.set_layout(DatabaseLayout::recalculate(
self.storage.raw_file_len()?,
region_header_pages,
region_max_pages,
self.page_size,
));
if repair_info.primary_corrupted {
header.swap_primary_slot();
was_clean = false;
} else {
// If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where:
// * the primary bit is flipped to the secondary
Expand All @@ -305,6 +299,7 @@ impl TransactionalMemory {
header.secondary_slot().transaction_id > header.primary_slot().transaction_id;
if secondary_newer && !repair_info.secondary_corrupted {
header.swap_primary_slot();
was_clean = false;
}
}
if repair_info.invalid_magic_number {
Expand All @@ -319,10 +314,9 @@ impl TransactionalMemory {

self.needs_recovery
.store(header.recovery_required, Ordering::Release);
let state = InMemoryState::from_bytes(header.clone(), &self.storage)?;
*self.state.lock().unwrap() = state;
self.state.lock().unwrap().header = header;

Ok(())
Ok(was_clean)
}

pub(crate) fn begin_writable(&self) -> Result {
Expand All @@ -337,6 +331,10 @@ impl TransactionalMemory {
Ok(self.state.lock().unwrap().header.recovery_required)
}

pub(crate) fn allocator_hash(&self) -> u128 {
self.state.lock().unwrap().allocators.xxh3_hash()
}

// TODO: need a clearer distinction between this and needs_repair()
pub(crate) fn storage_failure(&self) -> bool {
self.needs_recovery.load(Ordering::Acquire)
Expand Down Expand Up @@ -385,20 +383,32 @@ impl TransactionalMemory {
Ok(())
}

pub(crate) fn end_repair(&mut self) -> Result<()> {
let state = self.state.lock().unwrap();
pub(crate) fn end_repair(&self) -> Result<()> {
let mut state = self.state.lock().unwrap();
let tracker_len = state.allocators.region_tracker.to_vec().len();
drop(state);
// Allocate a new tracker page, since the old one will have been overwritten
let tracker_page = self
.allocate(tracker_len, CachePriority::High)?
.get_page_number();
let tracker_page = state.header.region_tracker();

let mut state = self.state.lock().unwrap();
state.header.set_region_tracker(tracker_page);
self.write_header(&state.header, false)?;
self.storage.flush(false)?;
let allocator = state.get_region_mut(tracker_page.region);
// Allocate a new tracker page, if the old one was overwritten or is too small
if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
|| tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
{
drop(state);
let new_tracker_page = self
.allocate(tracker_len, CachePriority::High)?
.get_page_number();

let mut state = self.state.lock().unwrap();
state.header.set_region_tracker(new_tracker_page);
self.write_header(&state.header, false)?;
self.storage.flush(false)?;
} else {
allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
drop(state);
}

let mut state = self.state.lock().unwrap();
let tracker_page = state.header.region_tracker();
state
.allocators
.flush_to(tracker_page, state.header.layout(), &self.storage)?;
Expand Down
9 changes: 9 additions & 0 deletions src/tree_store/page_store/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::tree_store::page_store::cached_file::{CachePriority, PagedCachedFile}
use crate::tree_store::page_store::header::DatabaseHeader;
use crate::tree_store::page_store::layout::DatabaseLayout;
use crate::tree_store::page_store::page_manager::{INITIAL_REGIONS, MAX_MAX_PAGE_ORDER};
use crate::tree_store::page_store::xxh3_checksum;
use crate::tree_store::PageNumber;
use crate::Result;
use std::cmp;
Expand Down Expand Up @@ -137,6 +138,14 @@ impl Allocators {
}
}

pub(crate) fn xxh3_hash(&self) -> u128 {
let mut result = xxh3_checksum(&self.region_tracker.to_vec());
for allocator in self.region_allocators.iter() {
result ^= xxh3_checksum(&allocator.to_vec());
}
result
}

pub(super) fn from_bytes(header: &DatabaseHeader, storage: &PagedCachedFile) -> Result<Self> {
let page_size = header.page_size();
let region_header_size =
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,35 @@ fn regression21() {
txn.commit().unwrap();
}

#[test]
fn check_integrity_clean() {
let tmpfile = create_tempfile();

let table_def: TableDefinition<'static, u64, u64> = TableDefinition::new("x");

let mut db = Database::builder().create(tmpfile.path()).unwrap();
assert!(db.check_integrity().unwrap());

let txn = db.begin_write().unwrap();
let mut table = txn.open_table(table_def).unwrap();

for i in 0..10 {
table.insert(0, i).unwrap();
}
drop(table);

txn.commit().unwrap();
assert!(db.check_integrity().unwrap());
drop(db);

let mut db = Database::builder().create(tmpfile.path()).unwrap();
assert!(db.check_integrity().unwrap());
drop(db);

let mut db = Database::builder().open(tmpfile.path()).unwrap();
assert!(db.check_integrity().unwrap());
}

#[test]
fn multimap_stats() {
let tmpfile = create_tempfile();
Expand Down

0 comments on commit 2eaf1cf

Please sign in to comment.