Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
This is a mass commit of where I've gotten with the current I/O+Cache…
Browse files Browse the repository at this point in the history
…+Locking approach.

I'm about to do some mass changes but I don't want to loose this.
  • Loading branch information
chotchki committed Oct 7, 2021
1 parent 36735e6 commit 2837532
Show file tree
Hide file tree
Showing 25 changed files with 818 additions and 380 deletions.
4 changes: 2 additions & 2 deletions benches/feophant_benchmark.rs
Expand Up @@ -3,9 +3,9 @@ use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use feophantlib::engine::get_row;
use feophantlib::engine::get_table;
use feophantlib::engine::io::block_layer::file_manager::FileManager;
use feophantlib::engine::io::block_layer::lock_cache_manager::LockCacheManager;
use feophantlib::engine::io::row_formats::RowData;
use feophantlib::engine::io::FileManager;
use feophantlib::engine::io::LockCacheManager;
use feophantlib::engine::io::RowManager;
use feophantlib::engine::transactions::TransactionId;
use futures::pin_mut;
Expand Down
4 changes: 2 additions & 2 deletions src/engine.rs
Expand Up @@ -33,10 +33,10 @@ pub use test_objects::get_table;
pub mod transactions;
use transactions::{TransactionId, TransactionManager};

use self::io::block_layer::file_manager::FileManager;
use self::io::block_layer::lock_cache_manager::LockCacheManager;
use self::io::ConstraintManager;
use self::io::FileManager;
use self::io::IndexManager;
use self::io::LockCacheManager;
use self::objects::QueryResult;
use std::ops::Deref;
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion src/engine/analyzer/definition_lookup.rs
Expand Up @@ -322,7 +322,8 @@ pub enum DefinitionLookupError {
mod tests {
use tempfile::TempDir;

use crate::engine::io::{FileManager, LockCacheManager};
use crate::engine::io::block_layer::file_manager::FileManager;
use crate::engine::io::block_layer::lock_cache_manager::LockCacheManager;

// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::super::super::io::RowManager;
Expand Down
13 changes: 2 additions & 11 deletions src/engine/io.rs
@@ -1,3 +1,5 @@
pub mod block_layer;

mod constraint_manager;
pub use constraint_manager::ConstraintManager;
pub use constraint_manager::ConstraintManagerError;
Expand All @@ -15,17 +17,6 @@ pub use index_manager::IndexManager;
mod index_row_manager;
pub use index_row_manager::IndexRowManager;

mod file_manager;
pub use file_manager::FileManager;
pub use file_manager::FileManagerError;

mod free_space_manager;
pub use free_space_manager::FreeSpaceManager;

mod lock_cache_manager;
pub use lock_cache_manager::LockCacheManager;
pub use lock_cache_manager::LockCacheManagerError;

pub mod page_formats;

pub mod row_formats;
Expand Down
36 changes: 36 additions & 0 deletions src/engine/io/block_layer.rs
@@ -0,0 +1,36 @@
/*
Addressing:
Uuid / Page Type / Page Offset
Locking:
Reading
Writing
Free Space:
In Use
Free
I am most concerned about lost writes.
Caching can move into the file layer, but locking stays out.
File Manager handles I/O operations
Free Space Manager guides what pages are usable
Lock Cache Manager Handles locking
Process:
let page = get_page_for_read()
*/

pub mod file_manager;

pub mod free_space_manager;

pub mod lock_cache_manager;

pub mod lock_manager;
@@ -1,7 +1,7 @@
//! This is a different approach than I had done before. This file manager runs its own loop based on a spawned task
//! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources.
use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
use bytes::{Bytes, BytesMut};
use super::super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
use bytes::Bytes;
use std::convert::TryFrom;
use std::ffi::OsString;
use std::num::TryFromIntError;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl FileManager {
&self,
page_id: &PageId,
offset: &PageOffset,
) -> Result<Option<BytesMut>, FileManagerError> {
) -> Result<Option<Bytes>, FileManagerError> {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
Expand Down
Expand Up @@ -2,8 +2,8 @@ use super::file_operations::{FileOperations, FileOperationsError};
/// Inner type that implements the actual I/O operations so the outter type can
/// handle queue management.
use super::request_type::RequestType;
use super::ResourceFormatter;
use crate::constants::PAGE_SIZE;
use crate::engine::io::file_manager::ResourceFormatter;
use crate::engine::io::page_formats::{PageId, PageOffset};
use bytes::{Bytes, BytesMut};
use lru::LruCache;
Expand Down Expand Up @@ -599,7 +599,7 @@ mod tests {

use crate::{
constants::PAGES_PER_FILE,
engine::io::{page_formats::PageType, FileManager},
engine::io::{block_layer::file_manager::FileManager, page_formats::PageType},
};

use super::*;
Expand Down
Expand Up @@ -14,7 +14,7 @@ use tokio::{
};

use crate::constants::PAGE_SIZE;
use crate::engine::io::file_manager::ResourceFormatter;
use crate::engine::io::block_layer::file_manager::ResourceFormatter;
use crate::engine::io::page_formats::{PageId, PageOffset};
pub struct FileOperations {}

Expand Down Expand Up @@ -74,7 +74,7 @@ impl FileOperations {
pub async fn read_chunk(
mut file: File,
page_offset: &PageOffset,
) -> Result<(File, Option<BytesMut>), FileOperationsError> {
) -> Result<(File, Option<Bytes>), FileOperationsError> {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);

let file_meta = file.metadata().await?;
Expand All @@ -94,7 +94,7 @@ impl FileOperations {
}
}

Ok((file, Some(buffer)))
Ok((file, Some(buffer.freeze())))
}

pub async fn update_chunk(
Expand Down
@@ -1,5 +1,5 @@
use crate::engine::io::page_formats::PageOffset;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use tokio::sync::oneshot::Sender;

use super::file_executor::FileExecutorError;
Expand All @@ -8,11 +8,6 @@ use super::file_executor::FileExecutorError;
pub enum RequestType {
GetOffset(Sender<Result<PageOffset, FileExecutorError>>),
Add((PageOffset, Bytes, Sender<Result<(), FileExecutorError>>)),
Read(
(
PageOffset,
Sender<Result<Option<BytesMut>, FileExecutorError>>,
),
),
Read((PageOffset, Sender<Result<Option<Bytes>, FileExecutorError>>)),
Update((PageOffset, Bytes, Sender<Result<(), FileExecutorError>>)),
}
Expand Up @@ -3,11 +3,11 @@
//! page to say the space is availible. This means each page here can cover 134MB of free space.

use super::{
page_formats::{PageId, PageOffset, PageType},
LockCacheManager, LockCacheManagerError,
super::page_formats::{PageId, PageOffset, PageType},
lock_cache_manager::{LockCacheManager, LockCacheManagerError},
};
use crate::constants::PAGE_SIZE;
use bytes::{Buf, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use thiserror::Error;

#[derive(Clone, Debug)]
Expand All @@ -33,7 +33,7 @@ impl FreeSpaceManager {
let page_handle = self.lock_cache_manager.get_page(free_id, &offset).await?;
match page_handle.as_ref() {
Some(s) => {
let mut page_frozen = s.clone().freeze();
let mut page_frozen = s.clone();
match Self::find_first_free_page_in_page(&mut page_frozen) {
Some(s) => {
let full_offset = PageOffset(s)
Expand All @@ -59,7 +59,7 @@ impl FreeSpaceManager {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize];
buffer.extend_from_slice(&new_page);
new_page_handle.replace(buffer);
new_page_handle.replace(buffer.freeze());

self.lock_cache_manager
.add_page(free_id, next_po, new_page_handle)
Expand All @@ -86,10 +86,11 @@ impl FreeSpaceManager {
.lock_cache_manager
.get_page_for_update(free_id, &po)
.await?;
let mut page = page_handle
let page = page_handle
.as_mut()
.ok_or(FreeSpaceManagerError::PageDoesNotExist(page_id))?;
Self::set_status_inside_page(&mut page, offset, status);
let new_page = Self::set_status_inside_page(page, offset, status);
page_handle.replace(new_page);

Ok(self
.lock_cache_manager
Expand Down Expand Up @@ -118,7 +119,9 @@ impl FreeSpaceManager {

/// Sets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) {
fn set_status_inside_page(src: &Bytes, offset: usize, status: FreeStat) -> Bytes {
let mut buffer = BytesMut::with_capacity(src.len());
buffer.extend_from_slice(&src[..]);
let offset_index = offset / 8;
let offset_subindex = offset % 8;

Expand All @@ -136,6 +139,8 @@ impl FreeSpaceManager {
}

buffer[offset_index] = new_value;

buffer.freeze()
}
}

Expand All @@ -161,14 +166,14 @@ mod tests {
use tempfile::TempDir;
use uuid::Uuid;

use crate::engine::io::FileManager;
use crate::engine::io::block_layer::file_manager::FileManager;

use super::*;

/// Gets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
//This was in the implementation, I just only needed it for unit tests
fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat {
fn get_status_inside_page(buffer: &Bytes, offset: usize) -> FreeStat {
let offset_index = offset / 8;
let offset_subindex = offset % 8;

Expand All @@ -187,11 +192,13 @@ mod tests {
let mut test = BytesMut::with_capacity(2);
test.put_u16(0x0);

for i in 0..test.capacity() * 8 {
let mut test = test.freeze();

for i in 0..test.len() * 8 {
assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse);
assert_eq!(get_status_inside_page(&test, i), FreeStat::InUse);
FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free);
test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::Free);
assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
}

Expand All @@ -204,11 +211,13 @@ mod tests {
test.put_u8(0x0);
test.put_u8(0x0);

let mut test = test.freeze();

for i in 0..test.len() * 8 {
let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone());
assert_eq!(free_page, Some(i));

FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse);
}
assert_eq!(
FreeSpaceManager::find_first_free_page_in_page(&mut test),
Expand Down

0 comments on commit 2837532

Please sign in to comment.