diff --git a/README.md b/README.md index 85c6f89..dd94b82 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ This project is under the MIT license. | --- | --- | | REPL for testing | Yes | | Append Only Log Structured Storage | Yes | -| Log Rotation | No | +| Log Rotation | Yes | | CRC Validation | Yes | | Startup KeyDir rebuilding | Yes | | Log Compaction | No | @@ -34,7 +34,7 @@ Note that at runtime the entire keyspace is maintained in memory so enough memor The data on disk is organized into files with a capped size that is configurable using the environment variables listed below. Files are rotated as they reach this size. -The previously stored and compacted files are named ``, where `` is the string representation of seconds since epoch. The file currently being written is named `0`. +The previously stored and compacted files are named ``, where `` is the string representation of milliseconds since epoch. The file currently being written is named with the most recent timestamp value. ## Binary Entry File Format @@ -134,7 +134,7 @@ The following environment variables provide configuration values for the databas #### Optional Environment Variables -`RUSTCASK_MAX_FILE_SIZE_MB` - The maximum file size cap, in MB. Defaults to 4 MB if not present. +`RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES` - The number of bytes after which to rotate the active file. Defaults to 4 MiB if not present. ## Running the Application @@ -142,7 +142,7 @@ First, configure a local `.env` file to contain the following environment variab ``` RUSTCASK_FILE_PATH= -RUSTCASK_MAX_FILE_SIZE_MB= +RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES= RUST_LOG= ``` diff --git a/src/main.rs b/src/main.rs index aa19f22..7256368 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ use std::{ }; use tracing_subscriber::EnvFilter; -const DEFAULT_RUSTCASK_MAX_FILE_SIZE_MB: &str = "4"; +const DEFAULT_RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES: u64 = 4194304; #[derive(Debug)] enum ReplError { @@ -45,7 +45,7 @@ fn main() -> Result<(), ReplError> { #[cfg(feature = "mock")] let db = MockDatabase::open(); #[cfg(not(feature = "mock"))] - let db = RustcaskDatabase::open(settings.file_path).map_err(|e| ReplError::DatabaseError(e))?; + let db = RustcaskDatabase::open(settings.file_path, settings.rotate_active_file_after_bytes).map_err(|e| ReplError::DatabaseError(e))?; cmd_loop(db) } @@ -64,12 +64,11 @@ fn load_settings() -> Result { ))); } - let max_file_size_env_value = env::var("RUSTCASK_MAX_FILE_SIZE_MB") - .unwrap_or(DEFAULT_RUSTCASK_MAX_FILE_SIZE_MB.into()) - .parse::() - .expect("RUSTCASK_MAX_FILE_SIZE_MB did not parse into a u8"); + let rotate_active_file_after_bytes_env_value = env::var("RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES") + .map_or(Ok(DEFAULT_RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES), |v| v.parse::()) + .expect("RUSTCASK_ROTATE_ACTIVE_FILE_AFTER_BYTES did not parse into a u64"); - Ok(Settings::new(file_path, max_file_size_env_value)) + Ok(Settings::new(file_path, rotate_active_file_after_bytes_env_value)) } #[tracing::instrument(skip(db))] diff --git a/src/rustcask_database.rs b/src/rustcask_database.rs index 3d50d3a..3c5ccb0 100644 --- a/src/rustcask_database.rs +++ b/src/rustcask_database.rs @@ -2,14 +2,11 @@ use std::{ collections::HashMap, fs::{self, File, OpenOptions}, io::{BufReader, BufWriter, ErrorKind, Read, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, + path::{Path, PathBuf}, thread, time::Duration, }; use crate::database::*; -const ACTIVE_FILE_NAME: &str = "0"; -const ACTIVE_FILE_NAME_ID: u32 = 0u32; - const FILE_HEADER_IDENTIFIER_BYTES: &[u8] = b"rustcask"; const FILE_HEADER_VERSION_BYTES: &[u8] = &[0x01, 0x00]; const FILE_HEADER_RESERVED_BYTES: &[u8] = &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; @@ -33,13 +30,37 @@ const FILE_ENTRY_HEADER_FLAGS_OFFSET: usize = FILE_ENTRY_HEADER_VALUE_SIZE_OFFSET + std::mem::size_of::(); pub struct RustcaskDatabase { - reader: BufReader, + // Max file size in bytes, after which files will be rotated + max_file_size_bytes: u64, + + // Base path storing all Rustcask database files + path: PathBuf, + + // Readers of files by FileId + readers: HashMap>, + + // FileId of file currently being written to + active_file_id: FileId, + + // Writer to active file writer: BufWriter, + + // Map of key byte vec to metadata regarding value location in files keydir: HashMap, KeyDirEntry>, } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct FileId(u32); +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] +struct FileId(u64); + +impl FileId { + pub fn next() -> Result { + use std::time::{SystemTime, UNIX_EPOCH}; + + Ok(FileId(SystemTime::now() + .duration_since(UNIX_EPOCH)? + .as_millis() as u64)) + } +} #[derive(Debug, Clone, Copy, Eq, PartialEq)] struct ValuePosition(u64); @@ -51,6 +72,21 @@ struct ValueSize(u32); #[derive(Debug, Clone, Copy, Eq, PartialEq)] struct Timestamp(u64); +impl Timestamp { + pub fn get_next_timestamp() -> Result { + Ok( + Timestamp( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(Error::TimestampError)? + .as_millis() + .try_into() + .map_err(Error::TimestampOverflow)? + ) + ) + } +} + #[derive(Debug, Clone, Copy, Eq, PartialEq)] struct KeyDirEntry { file_id: FileId, // name of the file in the data directory, timestamp seconds since epoch, or zero for the currently active file @@ -151,11 +187,27 @@ impl Database for RustcaskDatabase { let entry = self.keydir.get(key).ok_or(Error::KeyMissing)?; - let _ = self.reader.seek(SeekFrom::Start(entry.value_position.0))?; + // Get existing reader or create a new one to the desired file if needed + let reader = + self.readers + .entry(entry.file_id) + .or_insert( + { + let read_path = self.path.join(entry.file_id.0.to_string()); + + let read_file = OpenOptions::new() + .read(true) + .open(&read_path)?; + + BufReader::new(read_file) + } + ); + + let _ = reader.seek(SeekFrom::Start(entry.value_position.0))?; let mut value = vec![0; entry.value_size.0 as usize]; - let _ = self.reader.read_exact(&mut value)?; + let _ = reader.read_exact(&mut value)?; Ok(value) } @@ -170,17 +222,12 @@ impl Database for RustcaskDatabase { return Err(Error::EmptyValue); } - let timestamp = Timestamp( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map_err(Error::TimestampError)? - .as_millis() - .try_into() - .map_err(Error::TimestampOverflow)?, - ); + let timestamp = Timestamp::get_next_timestamp()?; let buffer = Self::build_file_entry_buffer(key, value, timestamp); + let _ = self.maybe_rotate_active_file()?; + let position = self.writer.seek(SeekFrom::End(0))?; let _ = self.writer.write_all(&buffer)?; let _ = self.writer.flush()?; @@ -190,7 +237,7 @@ impl Database for RustcaskDatabase { let _ = self.keydir.insert( Vec::from(key), KeyDirEntry { - file_id: FileId(ACTIVE_FILE_NAME_ID), // using only the active file for now + file_id: self.active_file_id, value_position: ValuePosition(value_position), value_size: ValueSize(value.len() as u32), timestamp, @@ -206,17 +253,12 @@ impl Database for RustcaskDatabase { return Err(Error::EmptyKey); } - let timestamp = Timestamp( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map_err(Error::TimestampError)? - .as_millis() - .try_into() - .map_err(Error::TimestampOverflow)?, - ); + let timestamp = Timestamp::get_next_timestamp()?; let buffer = Self::build_file_entry_buffer(key, &Vec::new(), timestamp); + let _ = self.maybe_rotate_active_file()?; + let _ = self.writer.seek(SeekFrom::End(0))?; let _ = self.writer.write_all(&buffer)?; let _ = self.writer.flush()?; @@ -228,6 +270,38 @@ impl Database for RustcaskDatabase { } impl RustcaskDatabase { + fn maybe_rotate_active_file(&mut self) -> Result<(), Error> { + self.writer.flush()?; + let writer_len = self.writer.get_ref().metadata()?.len(); + if writer_len > self.max_file_size_bytes { + tracing::debug!("File size {writer_len} exceeded limit {}, rotating", self.max_file_size_bytes); + + let mut next_file_id = FileId::next()?; + + // In the unlikely event that the next generated file id is the same as the active one, + // increment next_file_id to resolve the conflict + while next_file_id == self.active_file_id { + next_file_id.0 += 1; + } + + self.active_file_id = next_file_id; + + let active_file_path = self.path.join(self.active_file_id.0.to_string()); + + let mut write_file = OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&active_file_path)?; + + let _ = Self::write_header(&mut write_file)?; + + self.writer = BufWriter::new(write_file); + } + + Ok(()) + } + fn write_header(writer: &mut impl Write) -> Result<(), Error> { writer.write_all(FILE_HEADER_IDENTIFIER_BYTES)?; writer.write_all(FILE_HEADER_VERSION_BYTES)?; @@ -287,11 +361,14 @@ impl RustcaskDatabase { buffer } - fn rebuild_keydir(r: &mut R) -> Result, KeyDirEntry>, Error> { + // Returns keydir hashmap built from r, and a Vec of keys that have been deleted while reading r + fn rebuild_keydir(r: &mut R, file_id: FileId) -> Result<(HashMap, KeyDirEntry>, Vec>), Error> { let _file_header = Self::parse_file_header(r)?; let mut keydir = HashMap::new(); + let mut deletes = Vec::new(); + let mut value_offset = FILE_HEADER_SIZE as u64; loop { @@ -334,7 +411,7 @@ impl RustcaskDatabase { // Construct KeyDirEntry with value offset let keydir_entry = KeyDirEntry { - file_id: FileId(ACTIVE_FILE_NAME_ID), + file_id, timestamp: entry.timestamp, value_position: ValuePosition(value_offset as u64), value_size: entry.value_size, @@ -345,59 +422,93 @@ impl RustcaskDatabase { } else { // Remove from keydir let _ = keydir.remove(&key_buffer); + + // Add to deletes list to remove from keydirs built from other files, in case delete is absent from keydir built here + deletes.push(key_buffer); } // Increment value offset to next entry value_offset += entry.value_size.0 as u64; } - Ok(keydir) + Ok((keydir, deletes)) } - fn build_database(active_file_path: PathBuf) -> Result { - let exists = active_file_path.exists(); + fn build_database(path: &Path, max_file_size_bytes: u64) -> Result { + // Read path to gather all files named as the u32 seconds since unix epoch + let entries: Vec<_> = fs::read_dir(&path)?.collect::, _>>()?; + + let mut entries: Vec<_> = entries + .into_iter() + .filter_map(|e| { + e.file_name() + .to_str() + .and_then(|s| s.parse::().ok()) + .map(|n| (n, e.path())) + }) + .collect(); + + // Sort files numerically by name so we consume them in chronological order + entries.sort_by_key(|&(n, _)| n); - let mut read_file = OpenOptions::new() - .create(!active_file_path.exists()) + // Read through entries in order and rebuild keydir on each, merging maps together in order + let mut keydir = HashMap::new(); + let mut active_file_id = FileId(u64::MIN); + for (millis, path) in entries { + let mut read_file = OpenOptions::new() .read(true) - .truncate(false) - .append(true) - .open(&active_file_path)?; + .open(&path)?; + + let (current_keydir, deletes) = Self::rebuild_keydir(&mut read_file, FileId(millis))?; + + for key in &deletes { + keydir.remove(key); + } + + keydir.extend(current_keydir); + + active_file_id = FileId(millis); + } + + let db_exists = active_file_id != FileId(u64::MIN); + let active_file_path; + + if !db_exists { + active_file_id = FileId::next()?; + } + + active_file_path = path.join(active_file_id.0.to_string()); let mut write_file = OpenOptions::new() - .create(!active_file_path.exists()) + .create(!db_exists) .read(true) - .truncate(false) .append(true) .open(&active_file_path)?; - let keydir; + if !db_exists { + // Assumes if no active file exists, this is a new directory for a new database + // TODO: Implement more robust error handling of directory left in some invalid state, if possible - if exists { - keydir = Self::rebuild_keydir(&mut read_file)?; - } else { let _ = Self::write_header(&mut write_file)?; - keydir = HashMap::new(); } - let reader = BufReader::new(read_file); - let writer = BufWriter::new(write_file); Ok(Self { - reader, + path: path.into(), + active_file_id, + max_file_size_bytes, + readers: HashMap::new(), writer, keydir, }) } - pub fn open(path: impl AsRef) -> Result { + pub fn open(path: impl AsRef, max_file_size_bytes: u64) -> Result { let path = path.as_ref(); - // It is assumed that `path` contains only rustcask data files. - // The files are named either `0` for the current actively written file, - // or filenames consisting of a string representation of the unix epoch in seconds, - // when that file was created after file rotation. + // It is assumed that `path` contains rustcask data files named with the unix epoch timestamp in seconds. + // The active file is considered to be the file whose name is the msot recent unix epoch seconds timestamp. let path_stat = fs::metadata(path)?; @@ -408,9 +519,7 @@ impl RustcaskDatabase { ))); } - let active_file_path = path.join(ACTIVE_FILE_NAME); - - Self::build_database(active_file_path) + Self::build_database(path, max_file_size_bytes) } } @@ -546,20 +655,23 @@ mod tests { fn test_rebuild_keydir() { let data: &[u8] = include_bytes!("../tests/fixtures/sample.rustcask"); let mut cursor = Cursor::new(data); - let keydir = RustcaskDatabase::rebuild_keydir(&mut cursor).unwrap(); + let file_id = FileId(42); + let (keydir, deletes) = RustcaskDatabase::rebuild_keydir(&mut cursor, file_id).unwrap(); + + assert_eq!(deletes, vec![b"abc".as_slice(), b"foo".as_slice()]); assert_eq!(keydir.len(), 2); assert!(keydir.get(b"foo".as_slice()).is_none()); let abc = keydir.get(b"abc".as_slice()).unwrap(); - assert_eq!(abc.file_id, FileId(0)); + assert_eq!(abc.file_id, file_id); assert_eq!(abc.value_size, ValueSize(3)); assert_eq!(abc.timestamp, Timestamp(1776294722721)); assert_eq!(abc.value_position, ValuePosition(145)); let ghi = keydir.get(b"ghi".as_slice()).unwrap(); - assert_eq!(ghi.file_id, FileId(0)); + assert_eq!(ghi.file_id, file_id); assert_eq!(ghi.value_size, ValueSize(3)); assert_eq!(ghi.timestamp, Timestamp(1776294290866)); assert_eq!(ghi.value_position, ValuePosition(94)); diff --git a/src/settings.rs b/src/settings.rs index 8455654..b355e69 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -3,14 +3,14 @@ use std::path::PathBuf; #[derive(Debug)] pub struct Settings { pub file_path: PathBuf, - pub max_file_size: u8, // Max file size in MB, defaults to 4 MB + pub rotate_active_file_after_bytes: u64, } impl Settings { - pub fn new(file_path: PathBuf, max_file_size: u8) -> Self { + pub fn new(file_path: PathBuf, rotate_active_file_after_bytes: u64) -> Self { Self { file_path, - max_file_size, + rotate_active_file_after_bytes, } } }