From 2837532db597a23da91847af637ee3680206d351 Mon Sep 17 00:00:00 2001 From: Christopher Hotchkiss Date: Wed, 6 Oct 2021 20:59:42 -0400 Subject: [PATCH] This is a mass commit of where I've gotten with the current I/O+Cache+Locking approach. I'm about to do some mass changes but I don't want to loose this. --- benches/feophant_benchmark.rs | 4 +- src/engine.rs | 4 +- src/engine/analyzer/definition_lookup.rs | 3 +- src/engine/io.rs | 13 +- src/engine/io/block_layer.rs | 36 ++ .../io/{ => block_layer}/file_manager.rs | 6 +- .../file_manager/file_executor.rs | 4 +- .../file_manager/file_operations.rs | 6 +- .../file_manager/request_type.rs | 9 +- .../file_manager/resource_formatter.rs | 0 .../{ => block_layer}/free_space_manager.rs | 37 +- .../{ => block_layer}/lock_cache_manager.rs | 89 +++- src/engine/io/block_layer/lock_manager.rs | 3 + src/engine/io/format_traits/serializable.rs | 76 ++-- src/engine/io/index_formats/btree_branch.rs | 59 ++- .../io/index_formats/btree_first_page.rs | 60 ++- src/engine/io/index_formats/btree_node.rs | 22 +- src/engine/io/index_manager.rs | 390 +++++++++--------- src/engine/io/index_manager/find_leaf.rs | 171 ++++++++ src/engine/io/index_manager/split_leaf.rs | 150 +++++++ src/engine/io/page_formats/page_data.rs | 8 +- src/engine/io/page_formats/page_id.rs | 3 +- src/engine/io/row_manager.rs | 37 +- src/feophant.rs | 2 +- tests/visibility_tests.rs | 6 +- 25 files changed, 818 insertions(+), 380 deletions(-) create mode 100644 src/engine/io/block_layer.rs rename src/engine/io/{ => block_layer}/file_manager.rs (97%) rename src/engine/io/{ => block_layer}/file_manager/file_executor.rs (99%) rename src/engine/io/{ => block_layer}/file_manager/file_operations.rs (96%) rename src/engine/io/{ => block_layer}/file_manager/request_type.rs (70%) rename src/engine/io/{ => block_layer}/file_manager/resource_formatter.rs (100%) rename src/engine/io/{ => block_layer}/free_space_manager.rs (86%) rename src/engine/io/{ => block_layer}/lock_cache_manager.rs (71%) create mode 100644 src/engine/io/block_layer/lock_manager.rs create mode 100644 src/engine/io/index_manager/find_leaf.rs create mode 100644 src/engine/io/index_manager/split_leaf.rs diff --git a/benches/feophant_benchmark.rs b/benches/feophant_benchmark.rs index f75f36d..439d327 100644 --- a/benches/feophant_benchmark.rs +++ b/benches/feophant_benchmark.rs @@ -3,9 +3,9 @@ use criterion::Criterion; use criterion::{criterion_group, criterion_main}; use feophantlib::engine::get_row; use feophantlib::engine::get_table; +use feophantlib::engine::io::block_layer::file_manager::FileManager; +use feophantlib::engine::io::block_layer::lock_cache_manager::LockCacheManager; use feophantlib::engine::io::row_formats::RowData; -use feophantlib::engine::io::FileManager; -use feophantlib::engine::io::LockCacheManager; use feophantlib::engine::io::RowManager; use feophantlib::engine::transactions::TransactionId; use futures::pin_mut; diff --git a/src/engine.rs b/src/engine.rs index 16e1d6c..967403b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -33,10 +33,10 @@ pub use test_objects::get_table; pub mod transactions; use transactions::{TransactionId, TransactionManager}; +use self::io::block_layer::file_manager::FileManager; +use self::io::block_layer::lock_cache_manager::LockCacheManager; use self::io::ConstraintManager; -use self::io::FileManager; use self::io::IndexManager; -use self::io::LockCacheManager; use self::objects::QueryResult; use std::ops::Deref; use std::sync::Arc; diff --git a/src/engine/analyzer/definition_lookup.rs b/src/engine/analyzer/definition_lookup.rs index 7d1992a..ae94c5e 100644 --- a/src/engine/analyzer/definition_lookup.rs +++ b/src/engine/analyzer/definition_lookup.rs @@ -322,7 +322,8 @@ pub enum DefinitionLookupError { mod tests { use tempfile::TempDir; - use crate::engine::io::{FileManager, LockCacheManager}; + use crate::engine::io::block_layer::file_manager::FileManager; + use crate::engine::io::block_layer::lock_cache_manager::LockCacheManager; // Note this useful idiom: importing names from outer (for mod tests) scope. use super::super::super::io::RowManager; diff --git a/src/engine/io.rs b/src/engine/io.rs index 1096029..eac8c02 100644 --- a/src/engine/io.rs +++ b/src/engine/io.rs @@ -1,3 +1,5 @@ +pub mod block_layer; + mod constraint_manager; pub use constraint_manager::ConstraintManager; pub use constraint_manager::ConstraintManagerError; @@ -15,17 +17,6 @@ pub use index_manager::IndexManager; mod index_row_manager; pub use index_row_manager::IndexRowManager; -mod file_manager; -pub use file_manager::FileManager; -pub use file_manager::FileManagerError; - -mod free_space_manager; -pub use free_space_manager::FreeSpaceManager; - -mod lock_cache_manager; -pub use lock_cache_manager::LockCacheManager; -pub use lock_cache_manager::LockCacheManagerError; - pub mod page_formats; pub mod row_formats; diff --git a/src/engine/io/block_layer.rs b/src/engine/io/block_layer.rs new file mode 100644 index 0000000..e4778de --- /dev/null +++ b/src/engine/io/block_layer.rs @@ -0,0 +1,36 @@ +/* + + Addressing: + Uuid / Page Type / Page Offset + + Locking: + Reading + Writing + + Free Space: + In Use + Free + + I am most concerned about lost writes. + + Caching can move into the file layer, but locking stays out. + + File Manager handles I/O operations + + Free Space Manager guides what pages are usable + + Lock Cache Manager Handles locking + + + Process: + + let page = get_page_for_read() +*/ + +pub mod file_manager; + +pub mod free_space_manager; + +pub mod lock_cache_manager; + +pub mod lock_manager; diff --git a/src/engine/io/file_manager.rs b/src/engine/io/block_layer/file_manager.rs similarity index 97% rename from src/engine/io/file_manager.rs rename to src/engine/io/block_layer/file_manager.rs index bb93847..07a1b28 100644 --- a/src/engine/io/file_manager.rs +++ b/src/engine/io/block_layer/file_manager.rs @@ -1,7 +1,7 @@ //! This is a different approach than I had done before. This file manager runs its own loop based on a spawned task //! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources. -use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error}; -use bytes::{Bytes, BytesMut}; +use super::super::page_formats::{PageId, PageOffset, UInt12, UInt12Error}; +use bytes::Bytes; use std::convert::TryFrom; use std::ffi::OsString; use std::num::TryFromIntError; @@ -83,7 +83,7 @@ impl FileManager { &self, page_id: &PageId, offset: &PageOffset, - ) -> Result, FileManagerError> { + ) -> Result, FileManagerError> { let (res_request, res_receiver) = oneshot::channel(); self.request_queue diff --git a/src/engine/io/file_manager/file_executor.rs b/src/engine/io/block_layer/file_manager/file_executor.rs similarity index 99% rename from src/engine/io/file_manager/file_executor.rs rename to src/engine/io/block_layer/file_manager/file_executor.rs index 9f76fc3..7cab0bd 100644 --- a/src/engine/io/file_manager/file_executor.rs +++ b/src/engine/io/block_layer/file_manager/file_executor.rs @@ -2,8 +2,8 @@ use super::file_operations::{FileOperations, FileOperationsError}; /// Inner type that implements the actual I/O operations so the outter type can /// handle queue management. use super::request_type::RequestType; +use super::ResourceFormatter; use crate::constants::PAGE_SIZE; -use crate::engine::io::file_manager::ResourceFormatter; use crate::engine::io::page_formats::{PageId, PageOffset}; use bytes::{Bytes, BytesMut}; use lru::LruCache; @@ -599,7 +599,7 @@ mod tests { use crate::{ constants::PAGES_PER_FILE, - engine::io::{page_formats::PageType, FileManager}, + engine::io::{block_layer::file_manager::FileManager, page_formats::PageType}, }; use super::*; diff --git a/src/engine/io/file_manager/file_operations.rs b/src/engine/io/block_layer/file_manager/file_operations.rs similarity index 96% rename from src/engine/io/file_manager/file_operations.rs rename to src/engine/io/block_layer/file_manager/file_operations.rs index fcdd991..290a92f 100644 --- a/src/engine/io/file_manager/file_operations.rs +++ b/src/engine/io/block_layer/file_manager/file_operations.rs @@ -14,7 +14,7 @@ use tokio::{ }; use crate::constants::PAGE_SIZE; -use crate::engine::io::file_manager::ResourceFormatter; +use crate::engine::io::block_layer::file_manager::ResourceFormatter; use crate::engine::io::page_formats::{PageId, PageOffset}; pub struct FileOperations {} @@ -74,7 +74,7 @@ impl FileOperations { pub async fn read_chunk( mut file: File, page_offset: &PageOffset, - ) -> Result<(File, Option), FileOperationsError> { + ) -> Result<(File, Option), FileOperationsError> { let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let file_meta = file.metadata().await?; @@ -94,7 +94,7 @@ impl FileOperations { } } - Ok((file, Some(buffer))) + Ok((file, Some(buffer.freeze()))) } pub async fn update_chunk( diff --git a/src/engine/io/file_manager/request_type.rs b/src/engine/io/block_layer/file_manager/request_type.rs similarity index 70% rename from src/engine/io/file_manager/request_type.rs rename to src/engine/io/block_layer/file_manager/request_type.rs index b1a0f10..bff4ce4 100644 --- a/src/engine/io/file_manager/request_type.rs +++ b/src/engine/io/block_layer/file_manager/request_type.rs @@ -1,5 +1,5 @@ use crate::engine::io::page_formats::PageOffset; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use tokio::sync::oneshot::Sender; use super::file_executor::FileExecutorError; @@ -8,11 +8,6 @@ use super::file_executor::FileExecutorError; pub enum RequestType { GetOffset(Sender>), Add((PageOffset, Bytes, Sender>)), - Read( - ( - PageOffset, - Sender, FileExecutorError>>, - ), - ), + Read((PageOffset, Sender, FileExecutorError>>)), Update((PageOffset, Bytes, Sender>)), } diff --git a/src/engine/io/file_manager/resource_formatter.rs b/src/engine/io/block_layer/file_manager/resource_formatter.rs similarity index 100% rename from src/engine/io/file_manager/resource_formatter.rs rename to src/engine/io/block_layer/file_manager/resource_formatter.rs diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/block_layer/free_space_manager.rs similarity index 86% rename from src/engine/io/free_space_manager.rs rename to src/engine/io/block_layer/free_space_manager.rs index 23be17f..1431ac3 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/block_layer/free_space_manager.rs @@ -3,11 +3,11 @@ //! page to say the space is availible. This means each page here can cover 134MB of free space. use super::{ - page_formats::{PageId, PageOffset, PageType}, - LockCacheManager, LockCacheManagerError, + super::page_formats::{PageId, PageOffset, PageType}, + lock_cache_manager::{LockCacheManager, LockCacheManagerError}, }; use crate::constants::PAGE_SIZE; -use bytes::{Buf, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use thiserror::Error; #[derive(Clone, Debug)] @@ -33,7 +33,7 @@ impl FreeSpaceManager { let page_handle = self.lock_cache_manager.get_page(free_id, &offset).await?; match page_handle.as_ref() { Some(s) => { - let mut page_frozen = s.clone().freeze(); + let mut page_frozen = s.clone(); match Self::find_first_free_page_in_page(&mut page_frozen) { Some(s) => { let full_offset = PageOffset(s) @@ -59,7 +59,7 @@ impl FreeSpaceManager { let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize]; buffer.extend_from_slice(&new_page); - new_page_handle.replace(buffer); + new_page_handle.replace(buffer.freeze()); self.lock_cache_manager .add_page(free_id, next_po, new_page_handle) @@ -86,10 +86,11 @@ impl FreeSpaceManager { .lock_cache_manager .get_page_for_update(free_id, &po) .await?; - let mut page = page_handle + let page = page_handle .as_mut() .ok_or(FreeSpaceManagerError::PageDoesNotExist(page_id))?; - Self::set_status_inside_page(&mut page, offset, status); + let new_page = Self::set_status_inside_page(page, offset, status); + page_handle.replace(new_page); Ok(self .lock_cache_manager @@ -118,7 +119,9 @@ impl FreeSpaceManager { /// Sets the status of a field inside a page, you MUST pass an offset /// that fits in the buffer. - fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) { + fn set_status_inside_page(src: &Bytes, offset: usize, status: FreeStat) -> Bytes { + let mut buffer = BytesMut::with_capacity(src.len()); + buffer.extend_from_slice(&src[..]); let offset_index = offset / 8; let offset_subindex = offset % 8; @@ -136,6 +139,8 @@ impl FreeSpaceManager { } buffer[offset_index] = new_value; + + buffer.freeze() } } @@ -161,14 +166,14 @@ mod tests { use tempfile::TempDir; use uuid::Uuid; - use crate::engine::io::FileManager; + use crate::engine::io::block_layer::file_manager::FileManager; use super::*; /// Gets the status of a field inside a page, you MUST pass an offset /// that fits in the buffer. //This was in the implementation, I just only needed it for unit tests - fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat { + fn get_status_inside_page(buffer: &Bytes, offset: usize) -> FreeStat { let offset_index = offset / 8; let offset_subindex = offset % 8; @@ -187,11 +192,13 @@ mod tests { let mut test = BytesMut::with_capacity(2); test.put_u16(0x0); - for i in 0..test.capacity() * 8 { + let mut test = test.freeze(); + + for i in 0..test.len() * 8 { assert_eq!(get_status_inside_page(&test, i), FreeStat::Free); - FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); + test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse); assert_eq!(get_status_inside_page(&test, i), FreeStat::InUse); - FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free); + test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::Free); assert_eq!(get_status_inside_page(&test, i), FreeStat::Free); } @@ -204,11 +211,13 @@ mod tests { test.put_u8(0x0); test.put_u8(0x0); + let mut test = test.freeze(); + for i in 0..test.len() * 8 { let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone()); assert_eq!(free_page, Some(i)); - FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); + test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse); } assert_eq!( FreeSpaceManager::find_first_free_page_in_page(&mut test), diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/block_layer/lock_cache_manager.rs similarity index 71% rename from src/engine/io/lock_cache_manager.rs rename to src/engine/io/block_layer/lock_cache_manager.rs index cb06563..cb3107e 100644 --- a/src/engine/io/lock_cache_manager.rs +++ b/src/engine/io/block_layer/lock_cache_manager.rs @@ -1,8 +1,6 @@ -use super::{ - page_formats::{PageId, PageOffset}, - FileManager, FileManagerError, -}; -use bytes::BytesMut; +use super::file_manager::{FileManager, FileManagerError}; +use crate::engine::io::page_formats::{PageId, PageOffset}; +use bytes::Bytes; use lru::LruCache; use std::sync::Arc; use thiserror::Error; @@ -13,7 +11,7 @@ pub struct LockCacheManager { //TODO I don't like these massive single hashes protected with a single lock // Long term I need to make a fixed hashmap and evict them myself. // Holding on this since I might be able to work around it - cache: Arc>>>>>, + cache: Arc>>>>>, file_manager: Arc, } @@ -45,7 +43,7 @@ impl LockCacheManager { &self, page_id: PageId, offset: &PageOffset, - ) -> Result>, LockCacheManagerError> { + ) -> Result>, LockCacheManagerError> { Ok(self .get_page_internal(page_id, offset) .await? @@ -57,7 +55,7 @@ impl LockCacheManager { &self, page_id: PageId, offset: &PageOffset, - ) -> Result>, LockCacheManagerError> { + ) -> Result>, LockCacheManagerError> { Ok(self .get_page_internal(page_id, offset) .await? @@ -69,7 +67,7 @@ impl LockCacheManager { &self, page_id: PageId, offset: &PageOffset, - ) -> Result>>, LockCacheManagerError> { + ) -> Result>>, LockCacheManagerError> { let mut cache = self.cache.lock().await; match cache.get(&(page_id, *offset)) { Some(s) => Ok(s.clone()), @@ -95,7 +93,7 @@ impl LockCacheManager { &self, page_id: PageId, offset: PageOffset, - guard: OwnedRwLockWriteGuard>, + guard: OwnedRwLockWriteGuard>, ) -> Result<(), LockCacheManagerError> { let page = match guard.as_ref() { Some(s) => s.clone(), @@ -105,7 +103,7 @@ impl LockCacheManager { }; Ok(self .file_manager - .update_page(&page_id, &offset, page.freeze()) + .update_page(&page_id, &offset, page) .await?) } @@ -113,7 +111,7 @@ impl LockCacheManager { &self, page_id: PageId, offset: PageOffset, - guard: OwnedRwLockWriteGuard>, + guard: OwnedRwLockWriteGuard>, ) -> Result<(), LockCacheManagerError> { let page = match guard.as_ref() { Some(s) => s.clone(), @@ -121,10 +119,7 @@ impl LockCacheManager { return Err(LockCacheManagerError::PageMissing()); } }; - Ok(self - .file_manager - .add_page(&page_id, &offset, page.freeze()) - .await?) + Ok(self.file_manager.add_page(&page_id, &offset, page).await?) } } @@ -138,18 +133,26 @@ pub enum LockCacheManagerError { #[cfg(test)] mod tests { + use bytes::BytesMut; use tempfile::TempDir; use uuid::Uuid; - use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType}; + use crate::{ + constants::PAGE_SIZE, + engine::io::{ + format_traits::{Parseable, Serializable}, + index_formats::BTreeFirstPage, + page_formats::PageType, + }, + }; use super::*; - fn get_test_page(fill: u8) -> BytesMut { + fn get_test_page(fill: u8) -> Bytes { let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize); let free_space = vec![fill; PAGE_SIZE as usize]; test_page.extend_from_slice(&free_space); - test_page + test_page.freeze() } #[tokio::test] @@ -198,10 +201,9 @@ mod tests { let fifth_page = fifth_handle .as_mut() .ok_or(LockCacheManagerError::PageMissing())?; - fifth_page.clear(); let page4 = get_test_page(3); - fifth_page.extend_from_slice(&page4[0..page4.len()]); + fifth_handle.replace(page4); lm.update_page(page_id, fourth_offset, fifth_handle).await?; let mut sixth_handle = lm.get_page_for_update(page_id, &fourth_offset).await?; @@ -233,4 +235,49 @@ mod tests { Ok(()) } + + /// This is reproducing an interesting bug that the cache seems to be remembering + /// that someone had previously read data out of the buffer. I think a clone is + /// missing. + #[tokio::test] + async fn test_repeated_read() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm); + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let offset = lm.get_offset_non_zero(page_id).await?; + let mut page = lm.get_page_for_update(page_id, &offset).await?; + + let first = BTreeFirstPage { + root_offset: PageOffset(4000), + }; + first.serialize_and_pad(&mut page); + assert_eq!(page.as_ref().unwrap().len(), PAGE_SIZE as usize); + lm.update_page(page_id, offset, page).await?; + + let mut page2 = lm.get_page_for_update(page_id, &offset).await?; + assert_eq!(page2.as_ref().unwrap().len(), PAGE_SIZE as usize); + match page2.as_mut() { + Some(mut s) => { + let mut change = BTreeFirstPage::parse(&mut s)?; + } + None => panic!("Foo"), + } + drop(page2); + + let mut page3 = lm.get_page_for_update(page_id, &offset).await?; + assert_eq!(page3.as_ref().unwrap().len(), PAGE_SIZE as usize); + let node3 = BTreeFirstPage::parse(&mut page3.as_ref().unwrap().clone())?; + assert_eq!(node3.root_offset, PageOffset(4000)); + drop(page3); + + Ok(()) + } } diff --git a/src/engine/io/block_layer/lock_manager.rs b/src/engine/io/block_layer/lock_manager.rs new file mode 100644 index 0000000..8a879dc --- /dev/null +++ b/src/engine/io/block_layer/lock_manager.rs @@ -0,0 +1,3 @@ +pub struct LockManager; + +impl LockManager {} diff --git a/src/engine/io/format_traits/serializable.rs b/src/engine/io/format_traits/serializable.rs index 021f593..8f83380 100644 --- a/src/engine/io/format_traits/serializable.rs +++ b/src/engine/io/format_traits/serializable.rs @@ -1,6 +1,7 @@ //! Serializes a given struct to a given ByteMut -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; +use tokio::sync::OwnedRwLockWriteGuard; use crate::constants::PAGE_SIZE; @@ -8,37 +9,26 @@ pub trait Serializable { /// Transforms the structure to a byte stream fn serialize(&self, buffer: &mut impl BufMut); - /// Handles updating the page from the I/O sub system - fn serialize_and_pad(&self, page: &mut Option) { - match page.as_mut() { - Some(mut s) => { - s.clear(); + /// Produces a new page to support the change to how the I/O subsystem works + fn serialize_and_pad(&self, buffer: &mut OwnedRwLockWriteGuard>) { + let mut page = BytesMut::with_capacity(PAGE_SIZE as usize); + self.serialize(&mut page); - self.serialize(&mut s); - - if s.len() != PAGE_SIZE as usize { - let padding = vec![0; PAGE_SIZE as usize - s.len()]; - s.extend_from_slice(&padding); - } - } - None => { - let mut new_page = BytesMut::with_capacity(PAGE_SIZE as usize); - self.serialize(&mut new_page); - - if new_page.len() != PAGE_SIZE as usize { - let padding = vec![0; PAGE_SIZE as usize - new_page.len()]; - new_page.extend_from_slice(&padding); - } - - page.replace(new_page); - } + if page.len() != PAGE_SIZE as usize { + let padding = vec![0; PAGE_SIZE as usize - page.len()]; + page.extend_from_slice(&padding); } + + buffer.replace(page.freeze()); } } #[cfg(test)] mod tests { + use std::sync::Arc; + use bytes::Buf; + use tokio::sync::RwLock; use super::*; @@ -51,34 +41,24 @@ mod tests { } } - #[test] - fn test_none() -> Result<(), Box> { - let test = Test { inner: 2000 }; - - let mut page = None; - test.serialize_and_pad(&mut page); - - assert!(page.is_some()); - - let mut page = page.unwrap(); - assert_eq!(page.len(), PAGE_SIZE as usize); - assert_eq!(test.inner, page.get_u32_le()); - - Ok(()) - } - - #[test] - fn test_some() -> Result<(), Box> { + #[tokio::test] + async fn test_roundtrip() -> Result<(), Box> { let test = Test { inner: 2000 }; - let mut page = Some(BytesMut::with_capacity(PAGE_SIZE as usize)); - test.serialize_and_pad(&mut page); + let page_lock = Arc::new(RwLock::new(None)); + let mut guard = page_lock.clone().write_owned().await; - assert!(page.is_some()); + test.serialize_and_pad(&mut guard); + drop(guard); - let mut page = page.unwrap(); - assert_eq!(page.len(), PAGE_SIZE as usize); - assert_eq!(test.inner, page.get_u32_le()); + let page = page_lock.read_owned().await; + if let Some(s) = page.as_ref() { + let mut s = s.clone(); + assert_eq!(s.len(), PAGE_SIZE as usize); + assert_eq!(test.inner, s.get_u32_le()); + } else { + panic!("None found!"); + } Ok(()) } diff --git a/src/engine/io/index_formats/btree_branch.rs b/src/engine/io/index_formats/btree_branch.rs index d2f43fa..1aa0120 100644 --- a/src/engine/io/index_formats/btree_branch.rs +++ b/src/engine/io/index_formats/btree_branch.rs @@ -8,6 +8,7 @@ use crate::{ engine::{ io::{ encode_size, expected_encoded_size, + format_traits::Serializable, page_formats::{ItemIdData, ItemIdDataError, PageOffset}, row_formats::{NullMask, NullMaskError}, ConstEncodedSize, EncodedSize, SelfEncodedSize, SizeError, @@ -15,8 +16,8 @@ use crate::{ objects::{types::BaseSqlTypesError, SqlTuple}, }, }; -use bytes::{BufMut, Bytes, BytesMut}; -use std::{convert::TryFrom, num::TryFromIntError, ops::RangeBounds}; +use bytes::BufMut; +use std::{num::TryFromIntError, ops::RangeBounds}; use thiserror::Error; #[derive(Clone, Debug, PartialEq)] @@ -42,7 +43,6 @@ impl BTreeBranch { pub fn add( &mut self, - old_pointer: PageOffset, left_pointer: PageOffset, key: SqlTuple, right_pointer: PageOffset, @@ -78,7 +78,6 @@ impl BTreeBranch { /// **WARNING** If this function fails the branch should be considered poisoned and not used. pub fn add_and_split( &mut self, - old_pointer: PageOffset, left_pointer: PageOffset, key: SqlTuple, right_pointer: PageOffset, @@ -141,32 +140,6 @@ impl BTreeBranch { Ok(index_search_start(&self.keys, &self.pointers, range)?) } - - pub fn serialize(&self) -> Result { - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - buffer.put_u8(NodeType::Branch as u8); - - BTreeNode::write_node(&mut buffer, Some(self.parent_node))?; - - encode_size(&mut buffer, self.keys.len()); - - for key in self.keys.iter() { - BTreeNode::write_sql_tuple(&mut buffer, key); - } - - for pointer in self.pointers.iter() { - let pointer_u64 = u64::try_from(pointer.0)?; - buffer.put_uint_le(pointer_u64, PageOffset::encoded_size()); - } - - //Zero pad to page size - if buffer.len() < PAGE_SIZE as usize { - let free_space = vec![0; PAGE_SIZE as usize - buffer.len()]; - buffer.extend_from_slice(&free_space); - } - - Ok(buffer.freeze()) - } } impl SelfEncodedSize for BTreeBranch { @@ -179,14 +152,28 @@ impl SelfEncodedSize for BTreeBranch { new_size += tup.encoded_size(); } - for point in self.pointers.iter() { - new_size += PageOffset::encoded_size(); - } + new_size += self.pointers.len() * PageOffset::encoded_size(); new_size } } +impl Serializable for BTreeBranch { + fn serialize(&self, buffer: &mut impl BufMut) { + buffer.put_u8(NodeType::Branch as u8); + + BTreeNode::write_node(buffer, Some(self.parent_node)); + + encode_size(buffer, self.keys.len()); + + for key in self.keys.iter() { + BTreeNode::write_sql_tuple(buffer, key); + } + + self.pointers.iter().for_each(|p| p.serialize(buffer)); + } +} + #[derive(Debug, Error)] pub enum BTreeBranchError { #[error(transparent)] @@ -233,6 +220,7 @@ mod tests { Attribute, Index, }, }; + use bytes::BytesMut; use uuid::Uuid; fn get_index() -> Index { @@ -278,8 +266,9 @@ mod tests { pointers, }; - let mut test_serial = test.serialize()?; - let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?; + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + test.serialize(&mut buffer); + let test_parse = BTreeNode::parse(&mut buffer, &get_index())?; match test_parse { BTreeNode::Branch(b) => assert_eq!(test, b), diff --git a/src/engine/io/index_formats/btree_first_page.rs b/src/engine/io/index_formats/btree_first_page.rs index c5eba72..da5023c 100644 --- a/src/engine/io/index_formats/btree_first_page.rs +++ b/src/engine/io/index_formats/btree_first_page.rs @@ -33,9 +33,19 @@ pub enum BTreeFirstPageError { #[cfg(test)] mod tests { + use std::sync::Arc; + use bytes::BytesMut; + use tempfile::TempDir; + use uuid::Uuid; - use crate::constants::PAGE_SIZE; + use crate::{ + constants::PAGE_SIZE, + engine::io::{ + block_layer::{file_manager::FileManager, lock_cache_manager::LockCacheManager}, + page_formats::{PageId, PageType}, + }, + }; use super::*; @@ -54,4 +64,52 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_on_disk() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm.clone()); + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let first_offset = lm.get_offset(page_id).await?; + assert_eq!(first_offset, PageOffset(0)); + + let mut first_page = lm.get_page_for_update(page_id, &first_offset).await?; + let root_offset = lm.get_offset_non_zero(page_id).await?; + assert_ne!(root_offset, PageOffset(0)); + + let btfp = BTreeFirstPage { root_offset }; + btfp.serialize_and_pad(&mut first_page); + lm.update_page(page_id, first_offset, first_page).await?; + + // Okay now its time to actually test, without drop + let mut new_first_page = lm.get_page(page_id, &PageOffset(0)).await?.clone(); + if let Some(s) = new_first_page.as_mut() { + let btfp2 = BTreeFirstPage::parse(&mut s.clone())?; + assert_ne!(btfp2.root_offset, PageOffset(0)); + } else { + panic!("That page should exist!"); + } + + // Test again with a drop + drop(lm); + let lm2 = LockCacheManager::new(fm); + + let mut new_first_page2 = lm2.get_page(page_id, &PageOffset(0)).await?.clone(); + if let Some(s) = new_first_page2.as_mut() { + let btfp2 = BTreeFirstPage::parse(&mut s.clone())?; + assert_ne!(btfp2.root_offset, PageOffset(0)); + } else { + panic!("That page should exist!"); + } + + Ok(()) + } } diff --git a/src/engine/io/index_formats/btree_node.rs b/src/engine/io/index_formats/btree_node.rs index cc9ed4e..ef0b551 100644 --- a/src/engine/io/index_formats/btree_node.rs +++ b/src/engine/io/index_formats/btree_node.rs @@ -20,7 +20,8 @@ //! Note: Min size for all indexes is 2x PAGE_SIZE since the root page is used to mean None. This will change //! since the root page will have a pointer so we can lock and split the root node. -use crate::engine::io::page_formats::PageOffset; +use crate::engine::io::format_traits::{Parseable, Serializable}; +use crate::engine::io::page_formats::{PageOffset, PageOffsetError}; use crate::engine::io::row_formats::{ItemPointer, ItemPointerError, NullMaskError}; use crate::engine::io::{parse_size, ConstEncodedSize, SizeError}; use crate::engine::objects::types::{BaseSqlTypes, BaseSqlTypesError}; @@ -49,18 +50,11 @@ pub enum NodeType { } impl BTreeNode { - pub fn write_node( - buffer: &mut impl BufMut, - node: Option, - ) -> Result<(), BTreeNodeError> { + pub fn write_node(buffer: &mut impl BufMut, node: Option) { match node { - Some(pn) => { - let pn_u64 = u64::try_from(pn.0)?; - buffer.put_uint_le(pn_u64, size_of::()) - } + Some(pn) => pn.serialize(buffer), None => buffer.put_uint_le(0, size_of::()), } - Ok(()) } pub fn write_sql_tuple(buffer: &mut impl BufMut, tuple: &SqlTuple) { @@ -79,7 +73,7 @@ impl BTreeNode { } let node_type = buffer.get_u8(); - let parent_node = Self::parse_page(buffer)?; + let parent_node = PageOffset::parse(buffer)?; if node_type == NodeType::Leaf as u8 { let left_node = Self::parse_page(buffer)?; @@ -108,8 +102,6 @@ impl BTreeNode { nodes: buckets, })); } else { - let parent = parent_node.ok_or_else(BTreeNodeError::ParentNull)?; - let keys_count = parse_size(buffer)?; let mut keys = Vec::with_capacity(keys_count); @@ -135,7 +127,7 @@ impl BTreeNode { } return Ok(BTreeNode::Branch(BTreeBranch { - parent_node: parent, + parent_node, keys, pointers, })); @@ -192,6 +184,8 @@ pub enum BTreeNodeError { MissingPointerData(usize, usize), #[error(transparent)] NullMaskError(#[from] NullMaskError), + #[error(transparent)] + PageOffsetError(#[from] PageOffsetError), #[error("Parent cannot be 0")] ParentNull(), #[error(transparent)] diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 65c2b49..bd9c2ef 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -2,32 +2,40 @@ // Okay so more thinking, my approach needs to change /* - For adds, I'll find the leaf page using write locks, dropping as I go. + lock leaf - Once found, I'll add and then follow the parents up until everything fits + split _ cow + lock left and right leaves + + check up splitting as we go + + write down, unlocking as we go */ -use super::index_formats::{BTreeBranchError, BTreeLeafError, BTreeNode, BTreeNodeError}; +use super::block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError}; +use super::format_traits::Parseable; +use super::index_formats::{ + BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeafError, BTreeNode, + BTreeNodeError, +}; use super::page_formats::PageOffset; -use super::page_formats::{ItemIdData, PageId, PageType}; +use super::page_formats::{PageId, PageType}; use super::row_formats::ItemPointer; -use super::{LockCacheManager, LockCacheManagerError, SelfEncodedSize}; -use crate::{ - constants::PAGE_SIZE, - engine::{ - io::index_formats::BTreeLeaf, - objects::{Index, SqlTuple}, - }, -}; -use bytes::{Buf, BufMut, BytesMut}; -use std::convert::TryFrom; -use std::mem::size_of; +use crate::engine::io::format_traits::Serializable; +use crate::engine::io::index_formats::BTreeBranch; +use crate::engine::objects::{Index, SqlTuple}; use std::num::TryFromIntError; -use std::ops::Range; use thiserror::Error; -use tokio::sync::OwnedRwLockWriteGuard; + +mod find_leaf; +use find_leaf::find_leaf; +use find_leaf::FindLeafError; + +mod split_leaf; +use split_leaf::split_leaf; +use split_leaf::SplitLeafError; //TODO Support something other than btrees //TODO Support searching on a non primary index column @@ -53,213 +61,199 @@ impl IndexManager { page_type: PageType::Data, }; - //Initial Special Case of an Empty Root - let (mut current_page, mut current_offset) = - self.get_root_page_for_write(index_def).await?; - if let None = current_page.as_mut() { - let root = BTreeLeaf::new(); - if !root.can_fit(&new_key) { - return Err(IndexManagerError::KeyTooLarge(new_key.encoded_size())); - } + debug!("Adding {:?}", new_key); + + //Find the target leaf + let (mut page, page_offset, mut leaf) = + find_leaf(&self.lock_cache_manager, index_def, &new_key).await?; + + debug!("target offset {0}", page_offset); + //If the key fits in the leaf, we add it and are done + if leaf.can_fit(&new_key) { + debug!("fits"); + + leaf.add(new_key, item_ptr)?; + + leaf.serialize_and_pad(&mut page); - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - root.serialize(&mut buffer); - current_page.replace(buffer); self.lock_cache_manager - .update_page(page_id, current_offset, current_page) + .update_page(page_id, page_offset, page) .await?; return Ok(()); } - //Next the goal is to get to the leaf - if let Some(s) = current_page.as_mut() { - let mut current_node = BTreeNode::parse(s, index_def)?; + debug!("expand"); - let mut found_leaf; - loop { - match current_node { - BTreeNode::Branch(b) => { - let next_page_offset = b.search(&new_key..&new_key)?; - current_page = self - .lock_cache_manager - .get_page_for_update(page_id, next_page_offset) - .await?; - current_offset = *next_page_offset; + //Lock the leafs left and right if they exist + let left_neighbor = leaf.left_node; + let left_page = match left_neighbor { + Some(s) => Some( + self.lock_cache_manager + .get_page_for_update(page_id, &s) + .await?, + ), + None => None, + }; - let s = current_page - .as_mut() - .ok_or_else(|| IndexManagerError::NodeEmpty(current_offset))?; - current_node = BTreeNode::parse(s, index_def)?; - continue; - } - BTreeNode::Leaf(mut l) => { - found_leaf = l; - break; - } + let right_neighbor = leaf.right_node; + let right_page = match right_neighbor { + Some(s) => Some( + self.lock_cache_manager + .get_page_for_update(page_id, &s) + .await?, + ), + None => None, + }; + + //Doesn't fit so we have to split and work back up to the loop + let (mut split_key, mut parent_node_offset, new_left_offset, new_right_offset) = + split_leaf(&self.lock_cache_manager, index_def, leaf, new_key, item_ptr).await?; + + if let Some(mut s) = left_page { + if let Some(s2) = s.as_mut() { + if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut s2.clone(), index_def)? { + l.right_node = Some(new_left_offset); + l.serialize_and_pad(&mut s); + self.lock_cache_manager + .update_page(page_id, left_neighbor.unwrap(), s) + .await?; + } else { + return Err(IndexManagerError::UnexpectedBranch(left_neighbor.unwrap())); } } + } - //If the key fits in the leaf, we add it and are done - if found_leaf.can_fit(&new_key) { - found_leaf.add(new_key, item_ptr); + if let Some(mut s) = right_page { + if let Some(s2) = s.as_mut() { + if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut s2.clone(), index_def)? { + l.left_node = Some(new_right_offset); + l.serialize_and_pad(&mut s); + self.lock_cache_manager + .update_page(page_id, right_neighbor.unwrap(), s) + .await?; + } else { + return Err(IndexManagerError::UnexpectedBranch(right_neighbor.unwrap())); + } + } + } - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - found_leaf.serialize(&mut buffer); + //Now its time to fix the tree + loop { + let mut parent_page = self + .lock_cache_manager + .get_page_for_update(page_id, &parent_node_offset) + .await?; + if parent_node_offset == PageOffset(0) { + //We've hit the top of the system so we'll have to remake the root page + let new_root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; - current_page.replace(buffer); + let mut new_root_page = self + .lock_cache_manager + .get_page_for_update(page_id, &new_root_offset) + .await?; + + let new_root = + BTreeBranch::new(PageOffset(0), new_left_offset, split_key, new_right_offset); + + new_root.serialize_and_pad(&mut new_root_page); + self.lock_cache_manager + .update_page(page_id, new_root_offset, new_root_page) + .await?; + + let first_page = BTreeFirstPage { + root_offset: new_root_offset, + }; + first_page.serialize_and_pad(&mut parent_page); self.lock_cache_manager - .update_page(page_id, current_offset, current_page) + .update_page(page_id, PageOffset(0), parent_page) .await?; + return Ok(()); } + if let Some(s) = parent_page.as_mut() { + if let BTreeNode::Branch(mut b) = BTreeNode::parse(&mut s.clone(), index_def)? { + if b.can_fit(&split_key) { + b.add(new_left_offset, split_key, new_right_offset)?; - //Doesn't fit so we have to split and work back up to the loop - let left_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; - let right_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; + b.serialize_and_pad(&mut parent_page); - let mut left_node_page = self - .lock_cache_manager - .get_page_for_update(page_id, &left_node_offset) - .await?; - let mut right_node_page = self - .lock_cache_manager - .get_page_for_update(page_id, &right_node_offset) - .await?; - - let (new_split, new_right_node) = - found_leaf.add_and_split(left_node_offset, right_node_offset, new_key, item_ptr)?; + self.lock_cache_manager + .update_page(page_id, parent_node_offset, parent_page) + .await?; - let mut parent_node_offset = found_leaf - .parent_node - .ok_or_else(IndexManagerError::ParentNodeEmpty)?; + return Ok(()); + } else { + //Need to split the branch and move up a level + let (middle_key, new_right) = + b.add_and_split(new_left_offset, split_key, new_right_offset)?; - let mut left_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - found_leaf.serialize(&mut left_node_buffer); + let new_right_offset = + self.lock_cache_manager.get_offset_non_zero(page_id).await?; + let mut new_right_page = self + .lock_cache_manager + .get_page_for_update(page_id, &new_right_offset) + .await?; + new_right.serialize_and_pad(&mut new_right_page); + self.lock_cache_manager + .update_page(page_id, new_right_offset, new_right_page) + .await?; - let mut right_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - new_right_node.serialize(&mut right_node_buffer); + b.serialize_and_pad(&mut parent_page); + self.lock_cache_manager + .update_page(page_id, PageOffset(0), parent_page) + .await?; - left_node_page.replace(left_node_buffer); - right_node_page.replace(right_node_buffer); + parent_node_offset = b.parent_node; + split_key = middle_key; - self.lock_cache_manager - .update_page(page_id, left_node_offset, left_node_page) - .await?; - self.lock_cache_manager - .update_page(page_id, right_node_offset, right_node_page) - .await?; - - //Now its time to fix the tree - loop { - let parent_page = self - .lock_cache_manager - .get_page_for_update(page_id, &parent_node_offset) - .await?; - if parent_node_offset == PageOffset(0) { - //We've hit the top of the system so we'll have to remake the root page + continue; + } + } else { + return Err(IndexManagerError::UnexpectedLeaf(parent_node_offset)); } + } else { + return Err(IndexManagerError::NodeEmpty(parent_node_offset)); } } - Ok(()) } - async fn get_root_page_for_write( + pub async fn search_for_key( &self, index_def: &Index, - ) -> Result<(OwnedRwLockWriteGuard>, PageOffset), IndexManagerError> { + key: &SqlTuple, + ) -> Result>, IndexManagerError> { let page_id = PageId { resource_key: index_def.id, page_type: PageType::Data, }; - - let mut first_page_handle = self + let first_page = self .lock_cache_manager - .get_page_for_update(page_id, &PageOffset(0)) + .get_page(page_id, &PageOffset(0)) .await?; - let (root_offset, changed) = match first_page_handle.as_mut() { - Some(mut s) => { - let root_offset = usize::try_from(s.get_uint_le(size_of::()))?; - if root_offset == 0 { - //This is wrong, recreate it - let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; - - s.clear(); - root_offset.serialize(&mut s); + if let Some(s) = first_page.as_ref() { + let first_node = BTreeFirstPage::parse(&mut s.clone())?; - (root_offset, true) - } else { - (PageOffset(root_offset), false) + let mut current_offset = first_node.root_offset; + loop { + debug!("scan {0}", current_offset); + let node = self.get_node(index_def, ¤t_offset).await?; + match node { + BTreeNode::Branch(b) => { + current_offset = *b.search(key..key)?; + continue; + } + BTreeNode::Leaf(l) => match l.nodes.get(key) { + Some(s) => return Ok(Some(s.clone())), + None => { + return Ok(None); + } + }, } } - None => { - let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; - - let mut first_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - root_offset.serialize(&mut first_page_buffer); - let new_page = vec![0; PAGE_SIZE as usize - size_of::()]; - first_page_buffer.extend_from_slice(&new_page); - - first_page_handle.replace(first_page_buffer); - - (root_offset, true) - } - }; - - //Now we know where root is, let's get it - let root_page_handle = self - .lock_cache_manager - .get_page_for_update(page_id, &root_offset) - .await?; - - if changed { - self.lock_cache_manager - .update_page(page_id, PageOffset(0), first_page_handle) - .await? - } - - Ok((root_page_handle, root_offset)) - } - /* - pub async fn search_for_key( - &self, - index_def: &Index, - key: &SqlTuple, - ) -> Result>, IndexManagerError> { - let (root_node, root_offset) = self.get_root_node(index_def).await?; - match root_node { - BTreeNode::Branch(b) => { - todo!("blah") - } - BTreeNode::Leaf(l) => match l.nodes.get(key) { - Some(s) => Ok(Some(s.clone())), - None => Ok(None), - }, - } - }*/ - - ///This function provides a mapping given an oversized bucket of how the leaf should be split - /// Returns: - /// * Left node range - /// * Node for lifting up to the parent (will be the same as the last left entry in the list) - /// * Right node range - fn map_split_node( - old_nodes_count: usize, - ) -> Result<(Range, usize, Range), IndexManagerError> { - if old_nodes_count < 2 { - return Err(IndexManagerError::UnableToSplit(old_nodes_count)); + } else { + Ok(None) } - - let mut midpoint = old_nodes_count / 2; - if old_nodes_count % 2 == 0 { - midpoint += 1; - } - - Ok(( - (0..midpoint - 1), - midpoint - 1, - (midpoint..old_nodes_count - 1), - )) } /// This provides the requested node based on the page, if it exists @@ -277,7 +271,7 @@ impl IndexManager { let page_buffer = page_handle.clone(); match page_buffer { - Some(page) => Ok(BTreeNode::parse(&mut page.freeze(), index_def)?), + Some(mut page) => Ok(BTreeNode::parse(&mut page, index_def)?), None => Err(IndexManagerError::NoSuchNode(*offset)), } } @@ -288,6 +282,8 @@ pub enum IndexManagerError { #[error(transparent)] BTreeBranchError(#[from] BTreeBranchError), #[error(transparent)] + BTreeFirstPageError(#[from] BTreeFirstPageError), + #[error(transparent)] BTreeLeafError(#[from] BTreeLeafError), #[error(transparent)] BTreeNodeError(#[from] BTreeNodeError), @@ -295,6 +291,8 @@ pub enum IndexManagerError { "Another process made the root index page first, maybe the developer should make locking." )] ConcurrentCreationError(), + #[error(transparent)] + FindLeafError(#[from] FindLeafError), #[error("Key too large size: {0}, maybe the developer should fix this.")] KeyTooLarge(usize), #[error(transparent)] @@ -307,12 +305,18 @@ pub enum IndexManagerError { ParentNodeEmpty(), #[error("Root Node Empty")] RootNodeEmpty(), + #[error(transparent)] + SplitLeafError(#[from] SplitLeafError), #[error("Unable to search, the stack is empty")] StackEmpty(), #[error(transparent)] TryFromIntError(#[from] TryFromIntError), #[error("Unable to split a node of size {0}")] UnableToSplit(usize), + #[error("Unexpect Branch at offset {0}")] + UnexpectedBranch(PageOffset), + #[error("Unexpect Leaf at offset {0}")] + UnexpectedLeaf(PageOffset), } #[cfg(test)] @@ -325,7 +329,7 @@ mod tests { use crate::{ constants::Nullable, engine::{ - io::{page_formats::UInt12, FileManager}, + io::{block_layer::file_manager::FileManager, page_formats::UInt12}, objects::{ types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, Attribute, @@ -334,6 +338,8 @@ mod tests { }; use super::*; + use log::LevelFilter; + use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode}; fn get_key_and_ptr(num: usize) -> (SqlTuple, ItemPointer) { ( @@ -347,11 +353,18 @@ mod tests { #[tokio::test] async fn test_roundtrip() -> Result<(), Box> { + CombinedLogger::init(vec![TermLogger::new( + LevelFilter::Debug, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + )])?; + let tmp = TempDir::new()?; let tmp_dir = tmp.path().as_os_str().to_os_string(); let fm = Arc::new(FileManager::new(tmp_dir)?); - let lm = LockCacheManager::new(fm); + let lm = LockCacheManager::new(fm.clone()); let im = IndexManager::new(lm); let index = Index { @@ -374,11 +387,16 @@ mod tests { unique: true, }; - for i in 0..5000 { + for i in 0..1000 { let (key, ptr) = get_key_and_ptr(i); im.add(&index, key, ptr).await?; } + let (key, ptr) = get_key_and_ptr(999); + assert_eq!(Some(vec![ptr]), im.search_for_key(&index, &key).await?); + + fm.shutdown().await?; + Ok(()) } } diff --git a/src/engine/io/index_manager/find_leaf.rs b/src/engine/io/index_manager/find_leaf.rs new file mode 100644 index 0000000..05430bd --- /dev/null +++ b/src/engine/io/index_manager/find_leaf.rs @@ -0,0 +1,171 @@ +use crate::engine::{ + io::{ + block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError}, + format_traits::{Parseable, Serializable}, + index_formats::{ + BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeaf, BTreeNode, + BTreeNodeError, + }, + page_formats::{PageId, PageOffset, PageType}, + }, + objects::{Index, SqlTuple}, +}; +use bytes::{Bytes, BytesMut}; +use thiserror::Error; +use tokio::sync::OwnedRwLockWriteGuard; + +pub async fn find_leaf( + lcm: &LockCacheManager, + index_def: &Index, + new_key: &SqlTuple, +) -> Result<(OwnedRwLockWriteGuard>, PageOffset, BTreeLeaf), FindLeafError> { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + + let mut prior_offset = PageOffset(0); + let mut offset = PageOffset(0); + + loop { + let mut page = lcm.get_page_for_update(page_id, &offset).await?; + + //Handle the first page + if offset == PageOffset(0) { + offset = match page.as_mut() { + Some(s) => { + let mut page_node = BTreeFirstPage::parse(&mut s.clone())?; + if page_node.root_offset == PageOffset(0) { + debug!("root is zero"); + page_node.root_offset = lcm.get_offset_non_zero(page_id).await?; + + page_node.serialize_and_pad(&mut page); + lcm.update_page(page_id, offset, page).await?; + } + page_node.root_offset + } + None => { + debug!("page doesn't exist"); + let root_offset = lcm.get_offset_non_zero(page_id).await?; + let page_node = BTreeFirstPage { root_offset }; + + page_node.serialize_and_pad(&mut page); + lcm.update_page(page_id, offset, page).await?; + + page_node.root_offset + } + }; + continue; + } + + match page.as_mut() { + None => { + //Special case, should only be due to root not existing + return Ok((page, offset, BTreeLeaf::new(prior_offset))); + } + Some(s) => { + let node = BTreeNode::parse(s, index_def)?; + + prior_offset = offset; + match node { + BTreeNode::Branch(b) => { + offset = *b.search(new_key..new_key)?; + continue; + } + BTreeNode::Leaf(l) => { + return Ok((page, offset, l)); + } + } + } + } + } +} + +#[derive(Debug, Error)] +pub enum FindLeafError { + #[error(transparent)] + BTreeBranchError(#[from] BTreeBranchError), + #[error(transparent)] + BTreeFirstPageError(#[from] BTreeFirstPageError), + #[error(transparent)] + BTreeNodeError(#[from] BTreeNodeError), + #[error(transparent)] + LockCacheManagerError(#[from] LockCacheManagerError), +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tempfile::TempDir; + use uuid::Uuid; + + use crate::engine::{ + io::{ + block_layer::file_manager::FileManager, index_manager::find_leaf, page_formats::UInt12, + row_formats::ItemPointer, + }, + objects::types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, + }; + + use super::*; + + //Super unsafe function to get test data, just don't count too high + fn get_key(index: usize) -> (SqlTuple, ItemPointer) { + ( + SqlTuple(vec![Some(BaseSqlTypes::Integer(index as u32))]), + ItemPointer::new(PageOffset(index), UInt12::new(index as u16).unwrap()), + ) + } + + #[tokio::test] + async fn test_find() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm.clone()); + + let index = Index { + id: Uuid::new_v4(), + name: "test".to_string(), + columns: Arc::new(SqlTypeDefinition(vec![( + "foo".to_string(), + BaseSqlTypesMapper::Integer, + )])), + unique: false, + }; + let page_id = PageId { + resource_key: index.id, + page_type: PageType::Data, + }; + + let first_offset = lm.get_offset(page_id).await?; + assert_eq!(first_offset, PageOffset(0)); + + let mut first_page = lm.get_page_for_update(page_id, &first_offset).await?; + let root_offset = lm.get_offset_non_zero(page_id).await?; + assert_ne!(root_offset, PageOffset(0)); + let mut root_page = lm.get_page_for_update(page_id, &root_offset).await?; + + let btfp = BTreeFirstPage { root_offset }; + btfp.serialize_and_pad(&mut first_page); + lm.update_page(page_id, first_offset, first_page).await?; + + let mut root = BTreeLeaf::new(first_offset); + let (key, ptr) = get_key(42); + root.add(key.clone(), ptr)?; + root.serialize_and_pad(&mut root_page); + lm.update_page(page_id, root_offset, root_page).await?; + + // Okay now its time to actually test + let (_, offset, leaf) = find_leaf(&lm, &index, &key).await?; + assert_eq!(leaf, root); + assert_ne!(offset, PageOffset(0)); + + let (_, offset2, leaf2) = find_leaf(&lm, &index, &key).await?; + assert_eq!(leaf2, root); + assert_eq!(offset, offset2); + Ok(()) + } +} diff --git a/src/engine/io/index_manager/split_leaf.rs b/src/engine/io/index_manager/split_leaf.rs new file mode 100644 index 0000000..5d8da11 --- /dev/null +++ b/src/engine/io/index_manager/split_leaf.rs @@ -0,0 +1,150 @@ +use crate::engine::{ + io::{ + block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError}, + format_traits::Serializable, + index_formats::{BTreeLeaf, BTreeLeafError}, + page_formats::{PageId, PageOffset, PageType}, + row_formats::ItemPointer, + }, + objects::{Index, SqlTuple}, +}; +use thiserror::Error; + +/// Takes a leaf node and produces a new right node +pub async fn split_leaf( + lcm: &LockCacheManager, + index_def: &Index, + mut leaf: BTreeLeaf, + new_key: SqlTuple, + item_ptr: ItemPointer, +) -> Result<(SqlTuple, PageOffset, PageOffset, PageOffset), SplitLeafError> { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + + let left_node_offset = lcm.get_offset_non_zero(page_id).await?; + let right_node_offset = lcm.get_offset_non_zero(page_id).await?; + + let mut left_node_page = lcm.get_page_for_update(page_id, &left_node_offset).await?; + let mut right_node_page = lcm.get_page_for_update(page_id, &right_node_offset).await?; + + let (new_split_key, new_right_node) = + leaf.add_and_split(left_node_offset, right_node_offset, new_key, item_ptr)?; + + let parent_node_offset = leaf.parent_node; + + leaf.serialize_and_pad(&mut left_node_page); + new_right_node.serialize_and_pad(&mut right_node_page); + + lcm.update_page(page_id, left_node_offset, left_node_page) + .await?; + lcm.update_page(page_id, right_node_offset, right_node_page) + .await?; + + Ok(( + new_split_key, + parent_node_offset, + left_node_offset, + right_node_offset, + )) +} + +#[derive(Debug, Error)] +pub enum SplitLeafError { + #[error(transparent)] + BTreeLeafError(#[from] BTreeLeafError), + #[error(transparent)] + LockCacheManagerError(#[from] LockCacheManagerError), +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tempfile::TempDir; + use uuid::Uuid; + + use crate::engine::{ + io::{ + block_layer::file_manager::FileManager, index_formats::BTreeNode, page_formats::UInt12, + }, + objects::types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, + }; + + use super::*; + + //Super unsafe function to get test data, just don't count too high + fn get_key(index: usize) -> (SqlTuple, ItemPointer) { + ( + SqlTuple(vec![Some(BaseSqlTypes::Integer(index as u32))]), + ItemPointer::new(PageOffset(index), UInt12::new(index as u16).unwrap()), + ) + } + + #[tokio::test] + async fn test_split_leaf() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lcm = LockCacheManager::new(fm); + + let index = Index { + id: Uuid::new_v4(), + name: "test".to_string(), + columns: Arc::new(SqlTypeDefinition(vec![( + "foo".to_string(), + BaseSqlTypesMapper::Integer, + )])), + unique: false, + }; + let page_id = PageId { + resource_key: index.id, + page_type: PageType::Data, + }; + + let parent_offset = lcm.get_offset_non_zero(page_id).await?; + let mut leaf = BTreeLeaf::new(parent_offset); + let leaf_size = leaf.nodes.len(); + + for i in 0..10 { + let (key, ptr) = get_key(i); + if leaf.can_fit(&key) { + leaf.add(key, ptr)?; + } + } + + let (key, ptr) = get_key(11); + + let (split_key, parent_node, left_offset, right_offset) = + split_leaf(&lcm, &index, leaf, key, ptr).await?; + + let left_page = lcm.get_page(page_id, &left_offset).await?; + let mut left_buffer = left_page.as_ref().unwrap().clone(); + let left_node = match BTreeNode::parse(&mut left_buffer, &index)? { + BTreeNode::Branch(_) => panic!("Unexpected branch"), + BTreeNode::Leaf(l) => l, + }; + + let right_page = lcm.get_page(page_id, &right_offset).await?; + let mut right_buffer = right_page.as_ref().unwrap().clone(); + let right_node = match BTreeNode::parse(&mut right_buffer, &index)? { + BTreeNode::Branch(_) => panic!("Unexpected branch"), + BTreeNode::Leaf(l) => l, + }; + + assert_eq!(parent_node, left_node.parent_node); + assert_eq!(parent_node, right_node.parent_node); + + for n in left_node.nodes { + assert!(n.0 <= split_key); + } + + for n in right_node.nodes { + assert!(n.0 > split_key); + } + + Ok(()) + } +} diff --git a/src/engine/io/page_formats/page_data.rs b/src/engine/io/page_formats/page_data.rs index c1d297e..500d51b 100644 --- a/src/engine/io/page_formats/page_data.rs +++ b/src/engine/io/page_formats/page_data.rs @@ -9,7 +9,7 @@ use super::{ ItemIdData, ItemIdDataError, PageHeader, PageHeaderError, PageOffset, UInt12, UInt12Error, }; use async_stream::stream; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes}; use futures::stream::Stream; use std::convert::TryFrom; use std::sync::Arc; @@ -99,7 +99,7 @@ impl PageData { pub fn parse( table: Arc, page: PageOffset, - buffer: &BytesMut, + buffer: &Bytes, ) -> Result { //Note since we need random access, everything MUST work off slices otherwise counts will be off @@ -202,7 +202,7 @@ mod tests { pd.serialize(&mut serial); assert_eq!(PAGE_SIZE as usize, serial.len()); - let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap(); + let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial.freeze()).unwrap(); pin_mut!(pg_parsed); let result_rows: Vec = pg_parsed.get_stream().collect().await; @@ -241,7 +241,7 @@ mod tests { } let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize); pd.serialize(&mut serial); - let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap(); + let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial.freeze()).unwrap(); pin_mut!(pg_parsed); let result_rows: Vec = pg_parsed.get_stream().collect().await; diff --git a/src/engine/io/page_formats/page_id.rs b/src/engine/io/page_formats/page_id.rs index 5ee8b7f..187da1c 100644 --- a/src/engine/io/page_formats/page_id.rs +++ b/src/engine/io/page_formats/page_id.rs @@ -1,5 +1,6 @@ //! A struct to uniquely identify a page in all operations. This replaces adding additional arguments everywhere. +use crate::engine::io::block_layer::file_manager::ResourceFormatter; use nom::{ bytes::complete::tag_no_case, error::{convert_error, make_error, ContextError, ErrorKind, ParseError, VerboseError}, @@ -12,8 +13,6 @@ use std::{ use thiserror::Error; use uuid::Uuid; -use crate::engine::io::file_manager::ResourceFormatter; - #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct PageId { pub resource_key: Uuid, diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index 5e830f5..f3c8540 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -1,16 +1,14 @@ use super::super::objects::Table; use super::super::transactions::TransactionId; +use super::block_layer::file_manager::FileManagerError; +use super::block_layer::free_space_manager::{FreeSpaceManager, FreeSpaceManagerError, FreeStat}; +use super::block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError}; use super::format_traits::Serializable; -use super::free_space_manager::{FreeSpaceManagerError, FreeStat}; use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12}; use super::row_formats::{ItemPointer, RowData, RowDataError}; -use super::{ - EncodedSize, FileManagerError, FreeSpaceManager, LockCacheManager, LockCacheManagerError, -}; -use crate::constants::PAGE_SIZE; +use super::EncodedSize; use crate::engine::objects::SqlTuple; use async_stream::try_stream; -use bytes::BytesMut; use futures::stream::Stream; use std::sync::Arc; use thiserror::Error; @@ -60,9 +58,10 @@ impl RowManager { .await?; let page_buffer = page_handle .as_mut() - .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; + .ok_or(RowManagerError::NonExistentPage(row_pointer.page))? + .clone(); - let mut page = PageData::parse(table, row_pointer.page, page_buffer)?; + let mut page = PageData::parse(table, row_pointer.page, &page_buffer)?; let mut row = page .get_row(row_pointer.count) .ok_or(RowManagerError::NonExistentRow( @@ -81,8 +80,7 @@ impl RowManager { row.max = Some(current_tran_id); page.update(row, row_pointer.count)?; - page_buffer.clear(); - page.serialize(page_buffer); + page.serialize_and_pad(&mut page_handle); self.lock_cache_manager .update_page(page_id, row_pointer.page, page_handle) @@ -110,9 +108,10 @@ impl RowManager { .await?; let old_page_buffer = old_page_handle .as_mut() - .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; + .ok_or(RowManagerError::NonExistentPage(row_pointer.page))? + .clone(); - let mut old_page = PageData::parse(table.clone(), row_pointer.page, old_page_buffer)?; + let mut old_page = PageData::parse(table.clone(), row_pointer.page, &old_page_buffer)?; let mut old_row = old_page .get_row(row_pointer.count) @@ -145,9 +144,7 @@ impl RowManager { old_row.max = Some(current_tran_id); old_row.item_pointer = new_row_pointer; old_page.update(old_row, row_pointer.count)?; - - old_page_buffer.clear(); - old_page.serialize(old_page_buffer); + old_page.serialize_and_pad(&mut old_page_handle); self.lock_cache_manager .update_page(page_id, row_pointer.page, old_page_handle) @@ -239,9 +236,8 @@ impl RowManager { Some(p) => { let mut page = PageData::parse(table.clone(), next_free_page, p)?; if page.can_fit(RowData::encoded_size(&user_data)) { - p.clear(); //We're going to reuse the buffer let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; - page.serialize(p); + page.serialize_and_pad(&mut page_bytes); self.lock_cache_manager .update_page(page_id, next_free_page, page_bytes) .await?; @@ -266,10 +262,7 @@ impl RowManager { let mut new_page = PageData::new(new_page_offset); let new_row_pointer = new_page.insert(current_tran_id, &table, user_data)?; //TODO Will NOT handle overly large rows - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - new_page.serialize(&mut buffer); - - new_page_handle.replace(buffer); + new_page.serialize_and_pad(&mut new_page_handle); self.lock_cache_manager .add_page(page_id, new_page_offset, new_page_handle) @@ -308,7 +301,7 @@ mod tests { use super::*; use crate::engine::get_row; use crate::engine::get_table; - use crate::engine::io::FileManager; + use crate::engine::io::block_layer::file_manager::FileManager; use futures::pin_mut; use tempfile::TempDir; use tokio_stream::StreamExt; diff --git a/src/feophant.rs b/src/feophant.rs index 66ca3ed..44a8281 100644 --- a/src/feophant.rs +++ b/src/feophant.rs @@ -1,7 +1,7 @@ use crate::{ codec::{NetworkFrame, PgCodec}, engine::{ - io::{FileManager, FileManagerError}, + io::block_layer::file_manager::{FileManager, FileManagerError}, transactions::TransactionManager, Engine, }, diff --git a/tests/visibility_tests.rs b/tests/visibility_tests.rs index 2a63d06..54d49a3 100644 --- a/tests/visibility_tests.rs +++ b/tests/visibility_tests.rs @@ -1,6 +1,10 @@ use feophantlib::engine::{ get_row, get_table, - io::{row_formats::RowData, FileManager, LockCacheManager, RowManager, VisibleRowManager}, + io::{ + block_layer::{file_manager::FileManager, lock_cache_manager::LockCacheManager}, + row_formats::RowData, + RowManager, VisibleRowManager, + }, transactions::TransactionManager, }; use futures::stream::StreamExt;