diff --git a/AGENTS.md b/AGENTS.md index 8bbfe00a..424d9453 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,10 +19,15 @@ - Make sure to run `cargo +nightly fmt` after making changes to apply default formatting rules. - Use pattern matching with if-let and match expressions for error handling +## Documentation Guidelines +- Avoid redundant documentation for the sake of convention. For example + - Don't include an Errors section if the only errors are generic failures. + - Don't include an Arguments section if the arguments are obvious based on the function signature. + ## Test File Conventions 1. Test files should be placed adjacent to the implementation file they're testing 2. Test files should be named with a `_test.rs` suffix (e.g., `network_quality_test.rs`) -3. Link test files in the implementation file using: +3. Link test files in the implementation file using the following pattern at the top of the file, right below the license header and optional module-level docs. ```rust #[cfg(test)] #[path = "./file_name_test.rs"] @@ -30,6 +35,11 @@ ``` 4. Tests in the same file as the implementation code should be avoided 5. Test names should *not* start with `test_`, as this is redundant +6. Use module-level clippy allow blocks instead of per-test allows: + ```rust + #![allow(clippy::unwrap_used)] + ``` + This should be placed at the top of the test file, after the license header and before imports. ## Code Quality Checks - After generating or modifying code, always run clippy to check for static lint violations: @@ -37,4 +47,4 @@ - For automatic fixing of some linting issues, use the `--fix` flag: `SKIP_PROTO_GEN=1 cargo clippy --workspace --bins --examples --tests --fix -- --no-deps` - Fix any remaining warnings before committing code -- Running clippy is especially important after code generation or modification to catch any potential issues \ No newline at end of file +- Running clippy is especially important after code generation or modification to catch any potential issues diff --git a/Cargo.lock b/Cargo.lock index ac6857f3..8013e89e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,10 +1106,20 @@ dependencies = [ "anyhow", "bd-bonjson", "bd-client-common", + "bd-log", + "bd-proto", + "bd-time", "bd-workspace-hack", "bytes", + "crc32fast", + "ctor", + "flate2", + "log", "memmap2", + "protobuf 4.0.0-alpha.0", "tempfile", + "time", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index de8de5e7..ed36fca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ arbitrary = { version = "1.4.2", features = ["derive"] } arc-swap = "1.7.1" assert_matches = "1.5.0" assert_no_alloc = "1.1.2" +async-compression = { version = "0.4.20", features = ["tokio", "zlib"] } async-trait = "0.1.89" axum = { version = "0.8.6", features = ["http2", "macros"] } axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"] } diff --git a/api b/api index 9f59c7f4..53334bc3 160000 --- a/api +++ b/api @@ -1 +1 @@ -Subproject commit 9f59c7f4d855a1aaec85a4647524353262f6cd58 +Subproject commit 53334bc3b224c81d1e39bcb5c1dfced7a5302a4a diff --git a/bd-proto/build.rs b/bd-proto/build.rs index 55b7af83..1626e0a0 100644 --- a/bd-proto/build.rs +++ b/bd-proto/build.rs @@ -117,6 +117,20 @@ fn main() { .out_dir("src/protos/logging/") .capture_stderr() .run_from_script(); + std::fs::create_dir_all("src/protos/state").unwrap(); + protobuf_codegen::Codegen::new() + .protoc() + .customize( + Customize::default() + .gen_mod_rs(false) + .oneofs_non_exhaustive(false) + .file_header(GENERATED_HEADER.to_string()), + ) + .includes(["../api/thirdparty", "../api/src"]) + .inputs(["../api/src/bitdrift_public/protobuf/state/v1/payload.proto"]) + .out_dir("src/protos/state/") + .capture_stderr() + .run_from_script(); std::fs::create_dir_all("src/protos/log_matcher").unwrap(); protobuf_codegen::Codegen::new() .protoc() diff --git a/bd-proto/src/protos/mod.rs b/bd-proto/src/protos/mod.rs index abec4e3d..4ea21e92 100644 --- a/bd-proto/src/protos/mod.rs +++ b/bd-proto/src/protos/mod.rs @@ -17,4 +17,5 @@ pub mod logging; pub mod mme; pub mod prometheus; pub mod pulse; +pub mod state; pub mod workflow; diff --git a/bd-proto/src/protos/state/mod.rs b/bd-proto/src/protos/state/mod.rs new file mode 100644 index 00000000..fbb091fe --- /dev/null +++ b/bd-proto/src/protos/state/mod.rs @@ -0,0 +1 @@ +pub mod payload; diff --git a/bd-proto/src/protos/state/payload.rs b/bd-proto/src/protos/state/payload.rs new file mode 100644 index 00000000..31ae479b --- /dev/null +++ b/bd-proto/src/protos/state/payload.rs @@ -0,0 +1,550 @@ +// proto - bitdrift's client/server API definitions +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code and APIs are governed by a source available license that can be found in +// the LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +// This file is generated by rust-protobuf 4.0.0-alpha.0. Do not edit +// .proto file is parsed by protoc 33.0 +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![cfg_attr(rustfmt, rustfmt::skip)] + +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_results)] +#![allow(unused_mut)] + +//! Generated file from `bitdrift_public/protobuf/state/v1/payload.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_4_0_0_ALPHA_0; + +// @@protoc_insertion_point(message:bitdrift_public.protobuf.state.v1.StateValue) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct StateValue { + // message oneof groups + pub value_type: ::std::option::Option, + // special fields + // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.state.v1.StateValue.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a StateValue { + fn default() -> &'a StateValue { + ::default_instance() + } +} + +impl StateValue { + pub fn new() -> StateValue { + ::std::default::Default::default() + } + + // string string_value = 1; + + pub fn string_value(&self) -> &str { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::StringValue(ref v)) => v, + _ => "", + } + } + + pub fn clear_string_value(&mut self) { + self.value_type = ::std::option::Option::None; + } + + pub fn has_string_value(&self) -> bool { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::StringValue(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_string_value(&mut self, v: ::std::string::String) { + self.value_type = ::std::option::Option::Some(state_value::Value_type::StringValue(v)) + } + + // Mutable pointer to the field. + pub fn mut_string_value(&mut self) -> &mut ::std::string::String { + if let ::std::option::Option::Some(state_value::Value_type::StringValue(_)) = self.value_type { + } else { + self.value_type = ::std::option::Option::Some(state_value::Value_type::StringValue(::std::string::String::new())); + } + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::StringValue(ref mut v)) => v, + _ => panic!(), + } + } + + // Take field + pub fn take_string_value(&mut self) -> ::std::string::String { + if self.has_string_value() { + match self.value_type.take() { + ::std::option::Option::Some(state_value::Value_type::StringValue(v)) => v, + _ => panic!(), + } + } else { + ::std::string::String::new() + } + } + + // int64 int_value = 2; + + pub fn int_value(&self) -> i64 { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::IntValue(v)) => v, + _ => 0, + } + } + + pub fn clear_int_value(&mut self) { + self.value_type = ::std::option::Option::None; + } + + pub fn has_int_value(&self) -> bool { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::IntValue(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_int_value(&mut self, v: i64) { + self.value_type = ::std::option::Option::Some(state_value::Value_type::IntValue(v)) + } + + // double double_value = 3; + + pub fn double_value(&self) -> f64 { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::DoubleValue(v)) => v, + _ => 0., + } + } + + pub fn clear_double_value(&mut self) { + self.value_type = ::std::option::Option::None; + } + + pub fn has_double_value(&self) -> bool { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::DoubleValue(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_double_value(&mut self, v: f64) { + self.value_type = ::std::option::Option::Some(state_value::Value_type::DoubleValue(v)) + } + + // bool bool_value = 4; + + pub fn bool_value(&self) -> bool { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::BoolValue(v)) => v, + _ => false, + } + } + + pub fn clear_bool_value(&mut self) { + self.value_type = ::std::option::Option::None; + } + + pub fn has_bool_value(&self) -> bool { + match self.value_type { + ::std::option::Option::Some(state_value::Value_type::BoolValue(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_bool_value(&mut self, v: bool) { + self.value_type = ::std::option::Option::Some(state_value::Value_type::BoolValue(v)) + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(4); + let mut oneofs = ::std::vec::Vec::with_capacity(1); + fields.push(::protobuf::reflect::rt::v2::make_oneof_deref_has_get_set_simpler_accessor::<_, _>( + "string_value", + StateValue::has_string_value, + StateValue::string_value, + StateValue::set_string_value, + )); + fields.push(::protobuf::reflect::rt::v2::make_oneof_copy_has_get_set_simpler_accessors::<_, _>( + "int_value", + StateValue::has_int_value, + StateValue::int_value, + StateValue::set_int_value, + )); + fields.push(::protobuf::reflect::rt::v2::make_oneof_copy_has_get_set_simpler_accessors::<_, _>( + "double_value", + StateValue::has_double_value, + StateValue::double_value, + StateValue::set_double_value, + )); + fields.push(::protobuf::reflect::rt::v2::make_oneof_copy_has_get_set_simpler_accessors::<_, _>( + "bool_value", + StateValue::has_bool_value, + StateValue::bool_value, + StateValue::set_bool_value, + )); + oneofs.push(state_value::Value_type::generated_oneof_descriptor_data()); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "StateValue", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for StateValue { + const NAME: &'static str = "StateValue"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 10 => { + self.value_type = ::std::option::Option::Some(state_value::Value_type::StringValue(is.read_string()?)); + }, + 16 => { + self.value_type = ::std::option::Option::Some(state_value::Value_type::IntValue(is.read_int64()?)); + }, + 25 => { + self.value_type = ::std::option::Option::Some(state_value::Value_type::DoubleValue(is.read_double()?)); + }, + 32 => { + self.value_type = ::std::option::Option::Some(state_value::Value_type::BoolValue(is.read_bool()?)); + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if let ::std::option::Option::Some(ref v) = self.value_type { + match v { + &state_value::Value_type::StringValue(ref v) => { + my_size += ::protobuf::rt::string_size(1, &v); + }, + &state_value::Value_type::IntValue(v) => { + my_size += ::protobuf::rt::int64_size(2, v); + }, + &state_value::Value_type::DoubleValue(v) => { + my_size += 1 + 8; + }, + &state_value::Value_type::BoolValue(v) => { + my_size += 1 + 1; + }, + }; + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if let ::std::option::Option::Some(ref v) = self.value_type { + match v { + &state_value::Value_type::StringValue(ref v) => { + os.write_string(1, v)?; + }, + &state_value::Value_type::IntValue(v) => { + os.write_int64(2, v)?; + }, + &state_value::Value_type::DoubleValue(v) => { + os.write_double(3, v)?; + }, + &state_value::Value_type::BoolValue(v) => { + os.write_bool(4, v)?; + }, + }; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> StateValue { + StateValue::new() + } + + fn clear(&mut self) { + self.value_type = ::std::option::Option::None; + self.value_type = ::std::option::Option::None; + self.value_type = ::std::option::Option::None; + self.value_type = ::std::option::Option::None; + self.special_fields.clear(); + } + + fn default_instance() -> &'static StateValue { + static instance: StateValue = StateValue { + value_type: ::std::option::Option::None, + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for StateValue { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("StateValue").unwrap()).clone() + } +} + +impl ::std::fmt::Display for StateValue { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for StateValue { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + +/// Nested message and enums of message `StateValue` +pub mod state_value { + + #[derive(Clone,PartialEq,Debug)] + // @@protoc_insertion_point(oneof:bitdrift_public.protobuf.state.v1.StateValue.value_type) + pub enum Value_type { + // @@protoc_insertion_point(oneof_field:bitdrift_public.protobuf.state.v1.StateValue.string_value) + StringValue(::std::string::String), + // @@protoc_insertion_point(oneof_field:bitdrift_public.protobuf.state.v1.StateValue.int_value) + IntValue(i64), + // @@protoc_insertion_point(oneof_field:bitdrift_public.protobuf.state.v1.StateValue.double_value) + DoubleValue(f64), + // @@protoc_insertion_point(oneof_field:bitdrift_public.protobuf.state.v1.StateValue.bool_value) + BoolValue(bool), + } + + impl ::protobuf::Oneof for Value_type { + } + + impl ::protobuf::OneofFull for Value_type { + fn descriptor() -> ::protobuf::reflect::OneofDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::OneofDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| ::descriptor().oneof_by_name("value_type").unwrap()).clone() + } + } + + impl Value_type { + pub(in super) fn generated_oneof_descriptor_data() -> ::protobuf::reflect::GeneratedOneofDescriptorData { + ::protobuf::reflect::GeneratedOneofDescriptorData::new::("value_type") + } + } +} + +// @@protoc_insertion_point(message:bitdrift_public.protobuf.state.v1.StateKeyValuePair) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct StateKeyValuePair { + // message fields + // @@protoc_insertion_point(field:bitdrift_public.protobuf.state.v1.StateKeyValuePair.key) + pub key: ::std::string::String, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.state.v1.StateKeyValuePair.value) + pub value: ::protobuf::MessageField, + // special fields + // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.state.v1.StateKeyValuePair.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a StateKeyValuePair { + fn default() -> &'a StateKeyValuePair { + ::default_instance() + } +} + +impl StateKeyValuePair { + pub fn new() -> StateKeyValuePair { + ::std::default::Default::default() + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(2); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "key", + |m: &StateKeyValuePair| { &m.key }, + |m: &mut StateKeyValuePair| { &mut m.key }, + )); + fields.push(::protobuf::reflect::rt::v2::make_message_field_accessor::<_, StateValue>( + "value", + |m: &StateKeyValuePair| { &m.value }, + |m: &mut StateKeyValuePair| { &mut m.value }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "StateKeyValuePair", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for StateKeyValuePair { + const NAME: &'static str = "StateKeyValuePair"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 10 => { + self.key = is.read_string()?; + }, + 18 => { + ::protobuf::rt::read_singular_message_into_field(is, &mut self.value)?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if !self.key.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.key); + } + if let Some(v) = self.value.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if !self.key.is_empty() { + os.write_string(1, &self.key)?; + } + if let Some(v) = self.value.as_ref() { + ::protobuf::rt::write_message_field_with_cached_size(2, v, os)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> StateKeyValuePair { + StateKeyValuePair::new() + } + + fn clear(&mut self) { + self.key.clear(); + self.value.clear(); + self.special_fields.clear(); + } + + fn default_instance() -> &'static StateKeyValuePair { + static instance: StateKeyValuePair = StateKeyValuePair { + key: ::std::string::String::new(), + value: ::protobuf::MessageField::none(), + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for StateKeyValuePair { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("StateKeyValuePair").unwrap()).clone() + } +} + +impl ::std::fmt::Display for StateKeyValuePair { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for StateKeyValuePair { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n/bitdrift_public/protobuf/state/v1/payload.proto\x12!bitdrift_public.p\ + rotobuf.state.v1\"\xa4\x01\n\nStateValue\x12#\n\x0cstring_value\x18\x01\ + \x20\x01(\tH\0R\x0bstringValue\x12\x1d\n\tint_value\x18\x02\x20\x01(\x03\ + H\0R\x08intValue\x12#\n\x0cdouble_value\x18\x03\x20\x01(\x01H\0R\x0bdoub\ + leValue\x12\x1f\n\nbool_value\x18\x04\x20\x01(\x08H\0R\tboolValueB\x0c\n\ + \nvalue_type\"j\n\x11StateKeyValuePair\x12\x10\n\x03key\x18\x01\x20\x01(\ + \tR\x03key\x12C\n\x05value\x18\x02\x20\x01(\x0b2-.bitdrift_public.protob\ + uf.state.v1.StateValueR\x05valueb\x06proto3\ +"; + +/// `FileDescriptorProto` object which was a source for this generated file +fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + static file_descriptor_proto_lazy: ::protobuf::rt::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::Lazy::new(); + file_descriptor_proto_lazy.get(|| { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() + }) +} + +/// `FileDescriptor` object which allows dynamic access to files +pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { + static generated_file_descriptor_lazy: ::protobuf::rt::Lazy<::protobuf::reflect::GeneratedFileDescriptor> = ::protobuf::rt::Lazy::new(); + static file_descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::FileDescriptor> = ::protobuf::rt::Lazy::new(); + file_descriptor.get(|| { + let generated_file_descriptor = generated_file_descriptor_lazy.get(|| { + let mut deps = ::std::vec::Vec::with_capacity(0); + let mut messages = ::std::vec::Vec::with_capacity(2); + messages.push(StateValue::generated_message_descriptor_data()); + messages.push(StateKeyValuePair::generated_message_descriptor_data()); + let mut enums = ::std::vec::Vec::with_capacity(0); + ::protobuf::reflect::GeneratedFileDescriptor::new_generated( + file_descriptor_proto(), + deps, + messages, + enums, + ) + }); + ::protobuf::reflect::FileDescriptor::new_generated_2(generated_file_descriptor) + }) +} diff --git a/bd-resilient-kv/CLAUDE.md b/bd-resilient-kv/AGENTS.md similarity index 66% rename from bd-resilient-kv/CLAUDE.md rename to bd-resilient-kv/AGENTS.md index 8cf65344..989b2b72 100644 --- a/bd-resilient-kv/CLAUDE.md +++ b/bd-resilient-kv/AGENTS.md @@ -4,6 +4,11 @@ This document provides insights and understanding about the `bd-resilient-kv` jo ## Core Architecture +The `bd-resilient-kv` library provides two storage models: + +1. **KVStore**: Standard double-buffered key-value store with automatic compaction +2. **VersionedKVStore**: Version-tracked store with point-in-time recovery and automatic rotation + ### KVJournal Trait The `KVJournal` trait is the foundation of the system, providing: - **Append-only semantics**: Journals accumulate entries over time without removing old data @@ -16,6 +21,9 @@ The `KVJournal` trait is the foundation of the system, providing: 1. **InMemoryKVJournal**: Core implementation backed by byte buffers 2. **MemMappedKVJournal**: File-backed implementation wrapping InMemoryKVJournal 3. **DoubleBufferedKVJournal**: High-level wrapper providing automatic compaction and retry logic +4. **VersionedKVJournal**: Versioned journal with entry-level version tracking +5. **MemMappedVersionedKVJournal**: Memory-mapped wrapper for versioned journals +6. **VersionedKVStore**: High-level API for versioned key-value storage with automatic rotation ### Bulk Operations Architecture @@ -31,14 +39,91 @@ The system provides efficient bulk operations through a consistent pattern: - Optimized for batch processing scenarios - Automatic timestamp synchronization for related entries +### Versioned Storage Architecture + +The `VersionedKVStore` provides a higher-level API built on top of `VersionedKVJournal`: + +**Key Components**: +- **VersionedKVJournal**: Low-level journal that tracks timestamps for each entry +- **MemMappedVersionedKVJournal**: Memory-mapped persistence layer +- **VersionedKVStore**: High-level HashMap-like API with automatic rotation and async write operations + +**Async API**: +- Write operations (`insert()`, `remove()`, `rotate_journal()`) are async and require a Tokio runtime +- Compression of archived journals is performed asynchronously using streaming I/O +- Read operations remain synchronous and operate on the in-memory cache +- The async API enables efficient background compression without blocking the main thread + +**Version Tracking**: +- Every write operation (`insert`, `remove`) returns a monotonically non-decreasing timestamp (microseconds since UNIX epoch) +- Timestamps serve as both version identifiers and logical clocks +- If the system clock goes backward, timestamps are clamped to the last timestamp to maintain monotonicity +- Entries with `Value::Null` are treated as deletions but still timestamped +- During rotation, snapshot entries preserve their original timestamps + +**Timestamp Tracking**: +- Each entry records a timestamp (microseconds since UNIX epoch) when the write occurred +- Timestamps are monotonically non-decreasing, not strictly increasing +- Multiple entries may share the same timestamp if the system clock doesn't advance between writes +- This is expected behavior, particularly during rapid writes or in test environments +- Recovery at timestamp T includes ALL entries with timestamp ≤ T, applied in version order +- Timestamps are preserved during rotation, maintaining temporal accuracy for audit purposes +- Test coverage: `test_timestamp_collision_across_rotation` in `versioned_recovery_test.rs` + +**Rotation Strategy**: +- Automatic rotation when journal size exceeds high water mark (triggered during async write operations) +- Current state is compacted into a new journal as versioned entries +- Old journal is archived with `.t{timestamp}.zz` suffix +- Archived journals are automatically compressed using zlib (RFC 1950, level 5) asynchronously + +**Compression**: +- All archived journals are automatically compressed during rotation using async I/O +- Active journals remain uncompressed for write performance +- Compression uses zlib format (RFC 1950) with level 5 for balanced speed/ratio +- Streaming compression avoids loading entire journals into memory +- Typical compression achieves >50% size reduction for text-based data +- File extension `.zz` indicates compressed archives +- Recovery transparently decompresses archived journals when needed + +**Point-in-Time Recovery**: +The `VersionedRecovery` utility provides point-in-time recovery by replaying journal entries up to a target timestamp. It works with raw uncompressed journal bytes and can reconstruct state at any historical timestamp across rotation boundaries. Recovery is optimized: `recover_current()` only reads the last journal (since rotation writes complete compacted state), while `recover_at_timestamp()` intelligently selects and replays only necessary journals. The `new()` constructor accepts a vector of `(&[u8], u64)` tuples (byte slice and snapshot timestamp). Callers must decompress archived journals before passing them to the constructor. + ## Critical Design Insights -### 1. Compaction Efficiency +### 1. Two Storage Models + +**KVStore (Double-Buffered)**: +- Best for: General-purpose key-value storage, configuration, caches +- Architecture: Two journals with automatic switching +- Compaction: Compresses entire state into inactive journal +- No version tracking +- Format: BONJSON-based entries + +**VersionedKVStore (Single Journal with Rotation)**: +- Best for: Audit logs, state history, remote backup +- Architecture: Single journal with archived versions +- Rotation: Creates new journal with compacted state +- Timestamp tracking: Every write returns a timestamp +- Format: Protobuf-based entries (VERSION 1) + +### 2. Compaction Efficiency **Key Insight**: Compaction via `reinit_from()` is already maximally efficient. It writes data in the most compact possible serialized form (hashmap → bytes). If even this compact representation exceeds high water marks, then the data volume itself is the limiting factor, not inefficient storage. **Implication**: Never assume compaction can always solve high water mark issues. Sometimes both buffers are legitimately full. -### 2. Bulk Operations and Retry Logic +### 3. Versioned Store Rotation vs KVStore Compaction + +**Key Differences**: +- **KVStore**: Switches between two buffers, old buffer is reset and reused +- **VersionedKVStore**: Archives old journal with `.t{timestamp}` suffix, creates new journal +- **Version Preservation**: Archived journals preserve complete history for recovery + +**When Rotation Occurs**: +- Triggered during `insert()` or `remove()` when journal size exceeds high water mark +- Can be manually triggered via `rotate()` +- Automatic and transparent to the caller + +### 4. Bulk Operations and Retry Logic The system includes sophisticated retry logic specifically for bulk operations: **`set_multiple` Intelligence**: The `set_multiple` method in `DoubleBufferedKVJournal` implements a two-phase approach: @@ -51,7 +136,7 @@ The system includes sophisticated retry logic specifically for bulk operations: - A retry immediately after might succeed on the now-compacted journal - High water mark flag accurately reflects whether retry is worthwhile -### 3. Simplified High Water Mark Detection +### 5. Simplified High Water Mark Detection The system uses a straightforward approach to high water mark detection: ```rust @@ -66,7 +151,7 @@ if journal.is_high_water_mark_triggered() { - No callback complexity or thread safety concerns - Direct control over when to check status -### 3. Double Buffered Journal Logic +### 6. Double Buffered Journal Logic The `DoubleBufferedKVJournal` implements automatic switching with sophisticated retry logic: 1. **Normal Operations**: Forward to active journal, switch if high water mark triggered @@ -144,6 +229,37 @@ fn set_multiple(&mut self, entries: &[(String, Value)]) -> anyhow::Result<()> { - **Retry Logic**: `set_multiple` should retry on failure if high water mark not triggered - **Error Scenarios**: Actual errors should be propagated, not masked +## Failure Modes: What to Handle vs What's Impossible + +### Failure Modes Applications Must Handle + +1. **Buffer Full During Normal Writes** + - **When**: Writing to journal when buffer is at capacity + - **Result**: Write operations return `SerializationError::BufferFull` + - **Action**: Check `is_high_water_mark_triggered()`, trigger compaction/rotation if needed + - **Note**: Even after compaction, if unique data exceeds buffer size, writes will still fail + +2. **High Water Mark Triggered After Compaction (KVStore)** + - **When**: Compacted state still exceeds high water mark threshold + - **Result**: `is_high_water_mark_triggered()` returns true after `switch_journals()` + - **Action**: Indicates buffer size is too small for the unique data volume, not a transient issue + +3. **I/O Errors During Persistence** + - **When**: File operations fail (disk full, permissions, etc.) + - **Result**: I/O errors propagated from memory-mapped operations + - **Action**: Handle as standard I/O errors + +4. **Compression/Archive Errors (VersionedKVStore)** + - **When**: Asynchronous compression of archived journal fails + - **Result**: Error during rotation's async compression phase + - **Action**: Retry compression, handle cleanup appropriately + +### Impossible Failure Modes (Architectural Guarantees) + +1. **Timestamp Overflow (VersionedKVStore)** + - **Why Practically Impossible**: Uses u64 for microsecond timestamps, would require 584,000+ years to overflow (u64::MAX microseconds ≈ year 586,524 CE) + - **Implication**: No overflow handling needed in practice + ## Common Pitfalls ### 1. Assuming Compaction Always Works @@ -158,6 +274,10 @@ fn set_multiple(&mut self, entries: &[(String, Value)]) -> anyhow::Result<()> { **Wrong**: Assuming high water mark flag means operation failed **Right**: High water mark flag indicates resource pressure, operations may still succeed +### 4. Over-Engineering for Impossible Scenarios +**Wrong**: Adding error handling for rotation buffer overflow +**Right**: Trust architectural guarantees, focus on actual failure modes + ## Key Methods and Their Purposes ### `set_multiple(entries: &[(String, Value)])` @@ -233,7 +353,7 @@ When modifying or refactoring code in the kv_journal system (or any Rust codebas - **Always update documentation and comments** to reflect current functionality - Pay special attention to trait documentation, method comments, and module-level explanations - Update CLAUDE.md or similar architectural documentation when making significant changes -- Ensure code comments explain the "why" behind complex logic, especially around callback mechanisms and compaction strategies +- Ensure code comments explain the "why" behind complex logic, especially around compaction strategies and retry mechanisms ### Code Quality Checks After making changes, run these commands in order: @@ -250,7 +370,7 @@ After making changes, run these commands in order: ### Testing - Run the full test suite: `cargo test -p bd-resilient-kv --lib` -- Pay special attention to tests that verify callback behavior and automatic switching +- Pay special attention to tests that verify automatic switching and retry logic - When adding new functionality, include comprehensive tests covering edge cases ### Git Workflow @@ -269,4 +389,4 @@ The kv_journal system is built around efficient append-only storage with intelli - **Cross-Layer Integration**: Consistent bulk operation patterns from FeatureFlags → KVStore → KVJournal - **Optimized for Real-World Use**: Handles edge cases like partial buffer fills and concurrent compaction -**Breaking Changes**: The callback system (`set_high_water_mark_callback`, `HighWaterMarkCallback`) has been completely removed. Code relying on callbacks will no longer compile and must be updated to check the `is_high_water_mark_triggered()` flag instead. \ No newline at end of file +**Breaking Changes**: The callback system (`set_high_water_mark_callback`, `HighWaterMarkCallback`) has been completely removed. Code relying on callbacks will no longer compile and must be updated to check the `is_high_water_mark_triggered()` flag instead. diff --git a/bd-resilient-kv/Cargo.toml b/bd-resilient-kv/Cargo.toml index fdb83557..3f3bd4af 100644 --- a/bd-resilient-kv/Cargo.toml +++ b/bd-resilient-kv/Cargo.toml @@ -10,12 +10,22 @@ doctest = false [dev-dependencies] tempfile.workspace = true +ctor.workspace = true +time.workspace = true [dependencies] -ahash.workspace = true -anyhow.workspace = true -bd-bonjson = { path = "../bd-bonjson" } -bd-client-common = { path = "../bd-client-common" } +ahash.workspace = true +log.workspace = true +anyhow.workspace = true +bd-bonjson = { path = "../bd-bonjson" } +bd-proto = { path = "../bd-proto" } +bd-time = { path = "../bd-time" } +bd-log = { path = "../bd-log" } +bd-client-common = { path = "../bd-client-common" } bd-workspace-hack.workspace = true -bytes.workspace = true -memmap2.workspace = true +bytes.workspace = true +crc32fast.workspace = true +flate2 = { workspace = true, features = ["zlib"] } +memmap2.workspace = true +tokio.workspace = true +protobuf.workspace = true diff --git a/bd-resilient-kv/VERSIONED_FORMAT.md b/bd-resilient-kv/VERSIONED_FORMAT.md new file mode 100644 index 00000000..3bfc2e99 --- /dev/null +++ b/bd-resilient-kv/VERSIONED_FORMAT.md @@ -0,0 +1,308 @@ +# Versioned KV Journal Design + +## Overview + +This document describes the versioned k-v store, which enables point-in-time state recovery by using timestamps as version identifiers. Each write operation is tagged with a timestamp, allowing the system to reconstruct the key-value store state at any historical moment. + +## Goals + +1. Enable recovery of key-value store state at any historical point in time +2. Preserve accurate write timestamps for audit and historical analysis +3. Support (near) indefinite retention of historical data without unbounded growth of active storage + +## Design Overview + +The versioned journal format uses timestamps as version identifiers for each write operation. Each entry in the journal records the timestamp, key, and value (or deletion marker) for every operation. This allows the store to reconstruct state at any point in time by replaying entries up to a target timestamp. + +To prevent unbounded growth, the system uses journal rotation: when the active journal reaches a size threshold, it is rotated out and replaced with a new journal containing only the current compacted state. The old journal is archived and compressed. Each archived journal preserves the original write timestamps of all entries, enabling point-in-time recovery across rotation boundaries. + +The underlying journal uses Protobuf to serialize the payloads that are used to implement the key-value semantics. + +## File Types + +### 1. Active Journal (`my_store.jrn.0`) +The current active journal receiving new writes. Active journals are **not compressed** for performance reasons. + +The number at the end of the active journal reflects the generation of the active journal, which allows us to safely rotate the journal while gracefully handling I/O errors. More on this below in the rotation section. + +### 2. Archived Journals (`my_store.jrn.t1699564900000000.zz`, etc.) +Previous journals, archived during rotation. Each contains complete state at its creation time plus subsequent incremental writes. The timestamp in the filename indicates the rotation/snapshot timestamp. + +**Archived journals are automatically compressed using zlib** (indicated by the `.zz` extension) to reduce storage space and bandwidth requirements for remote backup. Compression is mandatory and occurs automatically during rotation. + +## Format Specification + +### Binary Structure + +The byte-level layout of a VERSION 1 journal file: + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ JOURNAL FILE HEADER │ +├──────────────────┬──────────────────┬───────────────────────────────────┤ +│ Format Version │ Position │ Reserved │ +│ (u64) │ (u64) │ (u8) │ +│ 8 bytes │ 8 bytes │ 1 byte │ +└──────────────────┴──────────────────┴───────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────────────────┐ +│ VERSIONED JOURNAL ENTRY │ +│ (Protobuf-encoded StateKeyValuePair) │ +├─────────────────────────────────────────────────────────────────────────┤ +│ Frame Length (varint) │ Variable length (1-10 bytes) │ +│ Timestamp (varint) │ Variable length (microseconds) │ +│ Protobuf Payload │ Variable length │ +│ CRC32 │ 4 bytes │ +│ │ +│ Payload contains: │ +│ StateKeyValuePair { │ +│ key: String, // The key being modified │ +│ value: StateValue // Value for SET, null for DELETE │ +│ } │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + + +### Header Structure (17 bytes total) + +| Field | Offset | Size | Type | Value | Purpose | +|-------|--------|------|------|-------|---------| +| Format Version | 0 | 8 bytes | u64 (little-endian) | `1` | Allows future format evolution | +| Position | 8 | 8 bytes | u64 (little-endian) | Current write position | Tracks where next entry will be written | +| Reserved | 16 | 1 byte | u8 | `0` | Reserved for future use | + +### Entry Framing Format + +Each entry in the journal uses a length-prefixed framing format with CRC32 integrity checking: + +| Component | Size | Type | Description | +|-----------|------|------|-------------| +| Frame Length | Variable | varint | Total size of timestamp + protobuf payload + CRC32 (1-10 bytes) | +| Timestamp | Variable | varint | Entry timestamp in microseconds (serves as version) | +| Protobuf Payload | Variable | bytes | Serialized StateKeyValuePair message | +| CRC32 | 4 bytes | u32 (little-endian) | Checksum of timestamp + payload | + +### Versioned Journal Entry Schema + +Each entry in the journal is a `StateKeyValuePair` protobuf message: + +```protobuf +message StateKeyValuePair { + string key = 1; // The key being modified + StateValue value = 2; // Value for SET, null/empty for DELETE +} + +message StateValue { + oneof value { + string string_value = 1; + } +} +``` + +Fields: +- `key`: The key being written (string) +- `value`: The value being set (StateValue) or null/empty for DELETE operations + +**Timestamp Semantics:** +- Timestamps are stored as varints in microseconds since UNIX epoch +- Timestamps are monotonically non-decreasing, not strictly increasing +- If the system clock doesn't advance between writes, multiple entries may share the same timestamp +- This is expected behavior and ensures proper ordering without clock skew + +**Type Flexibility**: The `StateValue` message supports multiple value types: +- Primitives: strings, integers, doubles, booleans +- Complex types: lists, maps +- Binary data: bytes +- null value (indicates DELETE operation) + +**Size Considerations:** +- **Header**: Fixed 17 bytes +- **Per Entry**: Varies based on key and value size + - Frame length: 1-10 bytes (varint-encoded) + - Timestamp: 1-10 bytes (varint-encoded) + - Protobuf payload: varies by content + - CRC: Fixed 4 bytes + - Typical small entries: 20-50 bytes total + - Typical medium entries: 50-200 bytes total + +## Journal Structure + +### Initial Journal +When first created, the journal contains versioned entries: +``` +Entry 0: {"key": "key1", "value": "value1"} @ t=1699564801000000 +Entry 1: {"key": "key2", "value": "value2"} @ t=1699564802000000 +... +``` + +### Rotated Journal +After rotation at timestamp 1699564900000000, the new journal contains: +``` +Entry 0: {"key": "key1", "value": "value1"} @ t=1699564800123456 // Compacted state (original timestamp preserved) +Entry 1: {"key": "key2", "value": "value2"} @ t=1699564850987654 // Compacted state (original timestamp preserved) +Entry 2: {"key": "key3", "value": "value3"} @ t=1699564875111222 // Compacted state (original timestamp preserved) +Entry 3: {"key": "key4", "value": "value4"} @ t=1699564901000000 // New write after rotation +Entry 4: {"key": "key1", "value": "updated1"} @ t=1699564902000000 // New write after rotation +... +``` + +Key observations: +- **Timestamps are preserved**: Each compacted entry retains its original write timestamp (not the rotation time) + - This ensures that not only is the state at any given time recoverable from a given snapshot, we'll also be able to recover how long the current state values have been active for without looking at the previous snapshot. +- All entries use the same protobuf framing format +- New writes continue with later timestamps +- Each rotated journal is self-contained and can be read independently + +## Rotation Process + +When high water mark is reached: + +1. **Determine Rotation Timestamp**: Calculate max timestamp T from the most recent entry +2. **Increment Generation**: Calculate next generation number (e.g., 0 → 1) +3. **Create New Journal**: Initialize fresh journal file with next generation (e.g., `my_store.jrn.1`) +4. **Write Compacted State**: Write all current key-value pairs as versioned entries using their original update timestamp +5. **Activate New Journal**: Switch to new journal in-memory, unmap old journal +6. **Compress Archive** (async, best-effort): Compress old generation → `my_store.jrn.t{T}.zz` using zlib +7. **Delete Original** (best-effort): Remove uncompressed old generation file + +Example: +``` +Before rotation (generation 0): + my_store.jrn.0 # Active journal (generation 0) + +After rotation (generation 1): + my_store.jrn.1 # Active, contains compacted state (generation 1) + my_store.jrn.t1699564900000000.zz # Compressed archive of generation 0 +``` + +### Rotation Timeline Visualization + +``` +TIME + │ + ├─ t0: Normal Operation (Generation 0) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ + │ │ ├─ Entry @ t=1699564795000000 │ + │ │ ├─ Entry @ t=1699564796000000 │ + │ │ ├─ Entry @ t=1699564797000000 │ + │ │ ├─ Entry @ t=1699564798000000 │ + │ │ └─ Entry @ t=1699564799000000 │ + │ └────────────────────────────────────┘ + │ + ├─ t1: High Water Mark Reached + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ + │ │ └─ Entry @ t=1699564800000000 │ ← TRIGGER + │ └────────────────────────────────────┘ + │ max_timestamp = 1699564800000000 + │ + ├─ t2: Create New Journal (Step 1) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ (old, still active - generation 0) + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.1 │ (new, being written - generation 1) + │ │ └─ [header] │ + │ └────────────────────────────────────┘ + │ + ├─ t3: Write Compacted State (Step 2) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ (old, still active) + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.1 │ (new, being written) + │ │ ├─ Entry {"key1", ...} @ t=1699564750000000│ ← Original timestamps + │ │ ├─ Entry {"key2", ...} @ t=1699564780000000│ ← Original timestamps + │ │ └─ Entry {"key3", ...} @ t=1699564799000000│ ← Original timestamps + │ └────────────────────────────────────┘ + │ + ├─ t4: Activate New Journal (Step 3) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ (old, unmapped - ready for archive) + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.1 │ ← NOW ACTIVE! (generation 1) + │ │ (contains compacted state) │ + │ └────────────────────────────────────┘ + │ + ├─ t5: Compress Archive (Step 4 - Async) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.1 │ (active, accepting writes) + │ │ └─ Entry @ t=1699564801000000 │ ← New writes + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.0 │ (being compressed...) + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.t1699564800000000.zz │ (compressed output) + │ └────────────────────────────────────┘ + │ + ├─ t6: Delete Original (Step 5) + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.1 │ (active - generation 1) + │ └────────────────────────────────────┘ + │ ┌────────────────────────────────────┐ + │ │ my_store.jrn.t1699564800000000.zz │ (compressed archive of gen 0) + │ └────────────────────────────────────┘ + │ + └─ t7: Continue Normal Operation + ┌────────────────────────────────────┐ + │ my_store.jrn.1 │ + │ ├─ Entry @ t=1699564801000000 │ + │ ├─ Entry @ t=1699564802000000 │ + │ └─ Entry @ t=1699564803000000 │ + └────────────────────────────────────┘ + ┌────────────────────────────────────┐ + │ my_store.jrn.t1699564800000000.zz │ (ready for upload) + └────────────────────────────────────┘ +``` + +### Compression + +Archived journals are automatically compressed using zlib (compression level 5) during rotation: +- **Format**: Standard zlib format (RFC 1950) +- **Extension**: `.zz` indicates zlib compression +- **Benefits**: Reduced storage space and bandwidth for remote backups + +### Rotation Failure Modes and Recovery + +The generation-based rotation process is designed to be resilient: + +| Failure Point | State | Recovery | +|---------------|-------|----------| +| Before Step 3 | Old generation active, new generation partially written | Delete incomplete new generation, retry | +| During/After Step 5 | New generation active | Continue normally, old generation remains until compressed | +| During Step 6-7 | Compression fails | Uncompressed old generation may remain, but new journal is valid | + + +**What Can Fail:** +- I/O errors (disk full, permissions, etc.) +- Compression errors during async compression phase + +**Key Design Feature**: The rotation switches journals in-memory without file renames, making the critical transition atomic from the application's perspective. Old generation files remain at their original paths until successfully archived. + +## Recovery and Audit + +### Current State Recovery +The active journal is identified by finding the highest generation number (e.g., `my_store.jrn.0`, `my_store.jrn.1`, etc.). Simply read the active journal and replay all entries to reconstruct the current state. + +### Audit and Analysis +While `VersionedKVStore` does not support point-in-time recovery through its API, archived journals contain complete historical data. + +The timestamps in each entry allow you to understand the exact sequence of operations and build custom tooling for analyzing historical data. + +**Timestamp Accuracy**: All entries preserve their original write timestamps, even after rotation. This means you can accurately track when each write originally occurred. + +### Point-in-Time Recovery with VersionedRecovery + +While `VersionedKVStore` is designed for active operation and does not support point-in-time recovery through its API, the `VersionedRecovery` utility provides a way to reconstruct state at arbitrary historical timestamps from raw journal bytes. + +#### Overview + +`VersionedRecovery` is a separate utility that: +- Loads journals from file paths and automatically handles decompression of `.zz` archives +- Uses async I/O for efficient file loading +- Can process multiple journals for cross-rotation recovery +- Designed for offline analysis, server-side tooling, and audit systems +- Completely independent from `VersionedKVStore` diff --git a/bd-resilient-kv/src/lib.rs b/bd-resilient-kv/src/lib.rs index dc911eec..bd0bca6c 100644 --- a/bd-resilient-kv/src/lib.rs +++ b/bd-resilient-kv/src/lib.rs @@ -14,11 +14,22 @@ clippy::unwrap_used )] +#[cfg(test)] +#[ctor::ctor] +fn test_global_init() { + // TODO(snowp): Ideally we'll depend on bd-test-helpers here, but that would create a cyclic + // dependency. + bd_log::SwapLogger::initialize(); +} + #[cfg(test)] mod tests; pub mod kv_journal; pub mod kv_store; +mod versioned_kv_journal; pub use kv_journal::{DoubleBufferedKVJournal, InMemoryKVJournal, KVJournal, MemMappedKVJournal}; pub use kv_store::KVStore; +pub use versioned_kv_journal::recovery::VersionedRecovery; +pub use versioned_kv_journal::store::VersionedKVStore; diff --git a/bd-resilient-kv/src/tests/mod.rs b/bd-resilient-kv/src/tests/mod.rs index 58b0b4fd..38fa9f7a 100644 --- a/bd-resilient-kv/src/tests/mod.rs +++ b/bd-resilient-kv/src/tests/mod.rs @@ -22,6 +22,8 @@ clippy::items_after_statements )] +use bd_proto::protos::state; + pub mod boundary_test; pub mod concurrency_test; pub mod double_buffered_automatic_switching_test; @@ -33,3 +35,26 @@ pub mod error_handling_test; pub mod kv_store_test; pub mod kv_test; pub mod memmapped_test; +pub mod versioned_kv_store_test; +pub mod versioned_recovery_error_test; +pub mod versioned_recovery_test; + +/// Helper function to decompress zlib-compressed data. +pub fn decompress_zlib(data: &[u8]) -> anyhow::Result> { + use flate2::read::ZlibDecoder; + use std::io::Read; + + let mut decoder = ZlibDecoder::new(data); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + Ok(decompressed) +} + +pub fn make_string_value(s: &str) -> state::payload::StateValue { + state::payload::StateValue { + value_type: Some(state::payload::state_value::Value_type::StringValue( + s.to_string(), + )), + ..Default::default() + } +} diff --git a/bd-resilient-kv/src/tests/versioned_kv_store_test.rs b/bd-resilient-kv/src/tests/versioned_kv_store_test.rs new file mode 100644 index 00000000..558e7a8c --- /dev/null +++ b/bd-resilient-kv/src/tests/versioned_kv_store_test.rs @@ -0,0 +1,394 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] + +use crate::VersionedKVStore; +use crate::tests::decompress_zlib; +use crate::versioned_kv_journal::{TimestampedValue, make_string_value}; +use bd_proto::protos::state::payload::StateValue; +use bd_time::TestTimeProvider; +use std::sync::Arc; +use tempfile::TempDir; +use time::ext::NumericalDuration; +use time::macros::datetime; + +struct Setup { + temp_dir: TempDir, + store: VersionedKVStore, + time_provider: Arc, +} + +impl Setup { + async fn new() -> anyhow::Result { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(TestTimeProvider::new(datetime!(2024-01-01 00:00:00 UTC))); + + let (store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + + Ok(Self { + temp_dir, + store, + time_provider, + }) + } + + async fn make_store_from_snapshot_file( + &self, + snapshot_path: &std::path::Path, + ) -> anyhow::Result { + // Decompress the snapshot and journal files into the temp directory + // so we can open them as a store. + let data = std::fs::read(snapshot_path)?; + let decompressed_snapshot = decompress_zlib(&data)?; + std::fs::write( + self.temp_dir.path().join("snapshot.jrn.0"), + decompressed_snapshot, + )?; + + let (store, _) = VersionedKVStore::open_existing( + self.temp_dir.path(), + "snapshot", + 4096, + None, + self.time_provider.clone(), + ) + .await?; + + Ok(store) + } +} + +#[tokio::test] +async fn empty_store() -> anyhow::Result<()> { + let setup = Setup::new().await?; + + // Should start empty + assert!(setup.store.is_empty()); + assert_eq!(setup.store.len(), 0); + + assert!(setup.temp_dir.path().join("test.jrn.0").exists()); + + Ok(()) +} + +#[tokio::test] +async fn basic_crud() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(TestTimeProvider::new(datetime!(2024-01-01 00:00:00 UTC))); + + + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider).await?; + + // Insert some values + let ts1 = store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts2 = store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + + assert_eq!(store.len(), 2); + assert!(ts2 >= ts1); + + // Remove a key + let ts3 = store.remove("key1").await?; + assert!(ts3.is_some()); + assert!(ts3.unwrap() >= ts2); + + assert_eq!(store.len(), 1); + assert!(!store.contains_key("key1")); + assert!(store.contains_key("key2")); + + // Remove non-existent key + let removed = store.remove("nonexistent").await?; + assert!(removed.is_none()); + + // Read back existing key + let val = store.get("key2"); + assert_eq!(val, Some(&make_string_value("value2"))); + + // Read non-existent key + let val = store.get("key1"); + assert_eq!(val, None); + + Ok(()) +} + +#[tokio::test] +async fn test_persistence_and_reload() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(TestTimeProvider::new(datetime!(2024-01-01 00:00:00 UTC))); + + + // Create store and write some data + let (ts1, ts2) = { + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + let ts1 = store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts2 = store + .insert("key2".to_string(), make_string_value("foo")) + .await?; + store.sync()?; + + (ts1, ts2) + }; + + // Reopen and verify data persisted + { + let (store, _) = + VersionedKVStore::open_existing(temp_dir.path(), "test", 4096, None, time_provider).await?; + assert_eq!(store.len(), 2); + assert_eq!( + store.get_with_timestamp("key1"), + Some(&TimestampedValue { + value: make_string_value("value1"), + timestamp: ts1, + }) + ); + assert_eq!( + store.get_with_timestamp("key2"), + Some(&TimestampedValue { + value: make_string_value("foo"), + timestamp: ts2, + }) + ); + } + + Ok(()) +} + +#[tokio::test] +async fn test_null_value_is_deletion() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + // Insert a value + setup + .store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + assert!(setup.store.contains_key("key1")); + + // Insert empty state to delete + setup + .store + .insert("key1".to_string(), StateValue::default()) + .await?; + assert!(!setup.store.contains_key("key1")); + assert_eq!(setup.store.len(), 0); + + Ok(()) +} + +#[tokio::test] +async fn test_manual_rotation() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + // Insert some data + let _ts1 = setup + .store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts2 = setup + .store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + + // Get max timestamp before rotation (this will be used in the archive name) + let rotation_timestamp = setup + .store + .get_with_timestamp("key2") + .map(|tv| tv.timestamp) + .unwrap(); + + // Manually trigger rotation + setup.store.rotate_journal().await?; + + // Verify archived file exists (compressed) + let archived_path = setup + .temp_dir + .path() + .join(format!("test.jrn.t{}.zz", rotation_timestamp)); + assert!(archived_path.exists()); + + // Verify active journal still works + let ts3 = setup + .store + .insert("key3".to_string(), make_string_value("value3")) + .await?; + assert!(ts3 >= ts2); + assert_eq!(setup.store.len(), 3); + + // Verify data is intact + assert_eq!(setup.store.get("key1"), Some(&make_string_value("value1"))); + assert_eq!(setup.store.get("key2"), Some(&make_string_value("value2"))); + assert_eq!(setup.store.get("key3"), Some(&make_string_value("value3"))); + + // Decompress the archive and load it as a Store to verify that it contains the old state. + let snapshot_store = setup.make_store_from_snapshot_file(&archived_path).await?; + assert_eq!( + snapshot_store.get("key1"), + Some(&make_string_value("value1")) + ); + assert_eq!( + snapshot_store.get("key2"), + Some(&make_string_value("value2")) + ); + assert_eq!(snapshot_store.len(), 2); + + Ok(()) +} + +#[tokio::test] +async fn test_rotation_preserves_state() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + setup + .store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + + let pre_rotation_state = setup.store.as_hashmap().clone(); + let pre_rotation_ts = setup + .store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + // Rotate + setup.store.rotate_journal().await?; + + // Verify state is preserved exactly + let post_rotation_state = setup.store.as_hashmap(); + assert_eq!(pre_rotation_state, *post_rotation_state); + assert_eq!(setup.store.len(), 1); + + // Verify we can continue writing + let ts_new = setup + .store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + assert!(ts_new >= pre_rotation_ts); + assert_eq!(setup.store.len(), 2); + + Ok(()) +} + +#[tokio::test] +async fn test_empty_store_operations() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + // Operations on empty store + assert_eq!(setup.store.get("nonexistent"), None); + assert!(!setup.store.contains_key("nonexistent")); + assert_eq!(setup.store.remove("nonexistent").await?, None); + assert!(setup.store.is_empty()); + assert_eq!(setup.store.len(), 0); + + Ok(()) +} + +#[tokio::test] +async fn test_timestamp_preservation_during_rotation() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + // Insert some keys and capture their timestamps + let ts1 = setup + .store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + + // Advance time to ensure different timestamps. + setup.time_provider.advance(10.milliseconds()); + + let ts2 = setup + .store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + + // Verify timestamps are different + assert_ne!(ts1, ts2, "Timestamps should be different"); + assert!(ts2 > ts1, "Later writes should have later timestamps"); + + // Write enough data to trigger rotation + for i in 0 .. 50 { + setup + .store + .insert(format!("fill{i}"), make_string_value("foo")) + .await?; + } + + // Verify that after rotation, the original timestamps are preserved + let ts1_after = setup + .store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + let ts2_after = setup + .store + .get_with_timestamp("key2") + .map(|tv| tv.timestamp) + .unwrap(); + + assert_eq!( + ts1, ts1_after, + "key1 timestamp should be preserved during rotation" + ); + assert_eq!( + ts2, ts2_after, + "key2 timestamp should be preserved during rotation" + ); + + // Verify ordering is still correct + assert!( + ts2_after > ts1_after, + "Timestamp ordering should be preserved" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_multiple_rotations() -> anyhow::Result<()> { + let mut setup = Setup::new().await?; + + let mut rotation_timestamps = Vec::new(); + + // Perform multiple rotations + for i in 0 .. 3 { + let key = format!("key{}", i); + let value = make_string_value(&format!("value{}", i)); + setup.store.insert(key.clone(), value).await?; + let timestamp = setup + .store + .get_with_timestamp(&key) + .map(|tv| tv.timestamp) + .unwrap(); + rotation_timestamps.push(timestamp); + setup.store.rotate_journal().await?; + } + + // Verify all compressed archives exist + for timestamp in rotation_timestamps { + let archived_path = setup + .temp_dir + .path() + .join(format!("test.jrn.t{}.zz", timestamp)); + assert!( + archived_path.exists(), + "Compressed archive for timestamp {} should exist", + timestamp + ); + } + + Ok(()) +} diff --git a/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs b/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs new file mode 100644 index 00000000..a9550c04 --- /dev/null +++ b/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs @@ -0,0 +1,176 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] + +use crate::VersionedKVStore; +use crate::tests::decompress_zlib; +use crate::versioned_kv_journal::make_string_value; +use crate::versioned_kv_journal::recovery::VersionedRecovery; +use bd_time::TestTimeProvider; +use std::sync::Arc; +use tempfile::TempDir; +use time::ext::NumericalDuration; +use time::macros::datetime; + + +#[test] +fn test_recovery_buffer_too_small() { + // Create a buffer that's smaller than the header size (17 bytes) + let buffer = vec![0u8; 10]; + + let recovery = VersionedRecovery::new(vec![(&buffer, 1000)]).unwrap(); + let result = recovery.recover_current(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("Buffer too small")); +} + +#[test] +fn test_recovery_invalid_position_less_than_header() { + // Create a buffer with a position field that's less than HEADER_SIZE (17) + let mut buffer = vec![0u8; 100]; + + // Write version (1 byte) + buffer[0] = 1; + + // Write position at bytes 8-15 (u64, little-endian) + // Set position to 10, which is less than HEADER_SIZE (17) + let invalid_position: u64 = 10; + buffer[8 .. 16].copy_from_slice(&invalid_position.to_le_bytes()); + + let recovery = VersionedRecovery::new(vec![(&buffer, 1000)]).unwrap(); + let result = recovery.recover_current(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("Invalid position"), + "Expected 'Invalid position' error, got: {}", + err + ); +} + +#[test] +fn test_recovery_position_exceeds_buffer_length() { + // Create a buffer where position > buffer.len() + let mut buffer = vec![0u8; 50]; + + // Write version (1 byte) + buffer[0] = 1; + + // Write position at bytes 8-15 (u64, little-endian) + // Set position to 100, which exceeds buffer length of 50 + let invalid_position: u64 = 100; + buffer[8 .. 16].copy_from_slice(&invalid_position.to_le_bytes()); + + let recovery = VersionedRecovery::new(vec![(&buffer, 1000)]).unwrap(); + let result = recovery.recover_current(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("Invalid position"), + "Expected 'Invalid position' error, got: {}", + err + ); +} + +#[tokio::test] +async fn test_recovery_with_deletions() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(TestTimeProvider::new(datetime!(2024-01-01 00:00:00 UTC))); + + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + + let ts1 = store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + + time_provider.advance(10.milliseconds()); + + let ts2 = store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + + time_provider.advance(10.milliseconds()); + + // Delete key1 + let ts3 = store.remove("key1").await?.unwrap(); + + store.sync()?; + + // Rotate to create snapshot + let rotation = store.rotate_journal().await?; + + // Read the snapshot + let compressed_data = std::fs::read(&rotation.snapshot_path)?; + let decompressed_data = decompress_zlib(&compressed_data)?; + + // Use u64::MAX as snapshot timestamp since we're only checking the latest state + let recovery = VersionedRecovery::new(vec![(&decompressed_data, u64::MAX)])?; + + // At ts1, only key1 should exist + let state_ts1 = recovery.recover_at_timestamp(ts1)?; + assert_eq!(state_ts1.len(), 1); + assert!(state_ts1.contains_key("key1")); + + // At ts2, both keys should exist + let state_ts2 = recovery.recover_at_timestamp(ts2)?; + assert_eq!(state_ts2.len(), 2); + assert!(state_ts2.contains_key("key1")); + assert!(state_ts2.contains_key("key2")); + + // At ts3 (after deletion), only key2 should exist + let state_ts3 = recovery.recover_at_timestamp(ts3)?; + assert_eq!(state_ts3.len(), 1); + assert!(!state_ts3.contains_key("key1"), "key1 should be deleted"); + assert!(state_ts3.contains_key("key2")); + + Ok(()) +} + +#[test] +fn test_recovery_with_corrupted_frame() { + // Create a valid header followed by corrupted frame data + let mut buffer = vec![0u8; 100]; + + // Write version (1 byte) + buffer[0] = 1; + + // Write valid position at bytes 8-15 (u64, little-endian) + let position: u64 = 50; + buffer[8 .. 16].copy_from_slice(&position.to_le_bytes()); + + // Fill data area with corrupted/invalid frame data + // (random bytes that won't decode as a valid frame) + buffer[17 .. 50].fill(0xFF); + + // This should not panic, but should handle the corrupted frame gracefully + let result = VersionedRecovery::new(vec![(&buffer, 1000)]); + // The recovery should succeed even with corrupted frames (it will just stop reading) + assert!(result.is_ok()); + + let recovery = result.unwrap(); + let state = recovery.recover_current(); + // Should return empty state since frames are corrupted + assert!(state.is_ok()); +} + +#[tokio::test] +async fn test_recovery_current_with_empty_snapshots() -> anyhow::Result<()> { + // Test recover_current when there are no snapshots at all + let recovery = VersionedRecovery::new(vec![])?; + + let state = recovery.recover_current()?; + assert_eq!( + state.len(), + 0, + "Should return empty state with no snapshots" + ); + + Ok(()) +} diff --git a/bd-resilient-kv/src/tests/versioned_recovery_test.rs b/bd-resilient-kv/src/tests/versioned_recovery_test.rs new file mode 100644 index 00000000..6805c992 --- /dev/null +++ b/bd-resilient-kv/src/tests/versioned_recovery_test.rs @@ -0,0 +1,435 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] +#![allow(clippy::case_sensitive_file_extension_comparisons)] + +use crate::VersionedKVStore; +use crate::tests::decompress_zlib; +use crate::versioned_kv_journal::make_string_value; +use crate::versioned_kv_journal::recovery::VersionedRecovery; +use std::sync::Arc; +use tempfile::TempDir; +use time::ext::NumericalDuration; +use time::macros::datetime; + +/// Helper function to find archived journal files in a directory. +/// Returns sorted paths to all `.zz` compressed journal archives. +fn find_archived_journals(dir: &std::path::Path) -> anyhow::Result> { + let mut archived_files = std::fs::read_dir(dir)? + .filter_map(|entry| { + let entry = entry.ok()?; + let path = entry.path(); + let filename = path.file_name()?.to_str()?; + if filename.ends_with(".zz") { + Some(path) + } else { + None + } + }) + .collect::>(); + archived_files.sort(); + Ok(archived_files) +} + +/// Helper function to extract rotation timestamp from an archived journal filename. +/// Archived journals have the format: `{name}.jrn.t{timestamp}.zz` +fn extract_rotation_timestamp(path: &std::path::Path) -> anyhow::Result { + let filename = path + .file_name() + .and_then(|f| f.to_str()) + .ok_or_else(|| anyhow::anyhow!("Invalid filename"))?; + + let timestamp = filename + .split('.') + .find(|part| { + part.starts_with('t') && part.len() > 1 && part[1 ..].chars().all(|c| c.is_ascii_digit()) + }) + .and_then(|part| part.strip_prefix('t')) + .ok_or_else(|| anyhow::anyhow!("No timestamp found in filename: {}", filename))? + .parse::()?; + + Ok(timestamp) +} + +#[tokio::test] +async fn test_recovery_multiple_journals_with_rotation() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + + + let time_provider = Arc::new(bd_time::TestTimeProvider::new(datetime!( + 2024-01-01 00:00:00 UTC + ))); + + // Create a store with larger buffer to avoid BufferFull errors during test + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 2048, None, time_provider.clone()).await?; + + store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts1 = store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + time_provider.advance(10.milliseconds()); + + store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + + // Write more data to trigger rotation + for i in 0 .. 20 { + store + .insert(format!("key{i}"), make_string_value("foo")) + .await?; + } + + let ts_middle = store + .get_with_timestamp("key19") + .map(|tv| tv.timestamp) + .unwrap(); + + time_provider.advance(10.milliseconds()); + + // Write more after rotation + store + .insert("final".to_string(), make_string_value("final_value")) + .await?; + let ts_final = store + .get_with_timestamp("final") + .map(|tv| tv.timestamp) + .unwrap(); + store.sync()?; + + // Rotate to create final snapshot + store.rotate_journal().await?; + + // Read all snapshots (archived journals) + let mut all_journals = Vec::new(); + + for archived_path in find_archived_journals(temp_dir.path())? { + let rotation_ts = extract_rotation_timestamp(&archived_path)?; + let compressed_data = std::fs::read(&archived_path)?; + let decompressed_data = decompress_zlib(&compressed_data)?; + all_journals.push((decompressed_data, rotation_ts)); + } + + // Create recovery utility with all journals + let journal_refs: Vec<(&[u8], u64)> = all_journals + .iter() + .map(|(data, ts)| (data.as_slice(), *ts)) + .collect(); + let recovery = VersionedRecovery::new(journal_refs)?; + + // Verify we can recover at early timestamp + let state_ts1 = recovery.recover_at_timestamp(ts1)?; + assert_eq!(state_ts1.len(), 1); + assert!(state_ts1.contains_key("key1")); + + // Verify we can recover at middle timestamp (after rotation) + let state_middle = recovery.recover_at_timestamp(ts_middle)?; + assert!(state_middle.len() > 2); + assert!(state_middle.contains_key("key1")); + assert!(state_middle.contains_key("key2")); + + // Verify we can recover at final timestamp + let state_final = recovery.recover_at_timestamp(ts_final)?; + assert!(state_final.contains_key("final")); + assert_eq!( + state_final.get("final").map(|tv| &tv.value), + Some(&make_string_value("final_value")) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_recovery_empty_journal() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(bd_time::TestTimeProvider::new(datetime!( + 2024-01-01 00:00:00 UTC + ))); + + + // Create an empty store + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider).await?; + store.sync()?; + + // Rotate to create snapshot + store.rotate_journal().await?; + + // Read the snapshot + let archived_files = find_archived_journals(temp_dir.path())?; + assert_eq!(archived_files.len(), 1); + let compressed_data = std::fs::read(&archived_files[0])?; + let decompressed_data = decompress_zlib(&compressed_data)?; + let snapshot_ts = extract_rotation_timestamp(&archived_files[0])?; + + let recovery = VersionedRecovery::new(vec![(&decompressed_data, snapshot_ts)])?; + + // Recovering current should return empty map + let state = recovery.recover_current()?; + assert_eq!(state.len(), 0); + + Ok(()) +} + +#[tokio::test] +async fn test_recovery_with_overwrites() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(bd_time::TestTimeProvider::new(datetime!( + 2024-01-01 00:00:00 UTC + ))); + + + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + store + .insert("key".to_string(), make_string_value("1")) + .await?; + let ts1 = store + .get_with_timestamp("key") + .map(|tv| tv.timestamp) + .unwrap(); + + time_provider.advance(10.milliseconds()); + + store + .insert("key".to_string(), make_string_value("2")) + .await?; + let ts2 = store + .get_with_timestamp("key") + .map(|tv| tv.timestamp) + .unwrap(); + + time_provider.advance(10.milliseconds()); + + store + .insert("key".to_string(), make_string_value("3")) + .await?; + let ts3 = store + .get_with_timestamp("key") + .map(|tv| tv.timestamp) + .unwrap(); + + store.sync()?; + + // Rotate to create snapshot + store.rotate_journal().await?; + + // Read the snapshot + let archived_files = find_archived_journals(temp_dir.path())?; + assert_eq!(archived_files.len(), 1); + let compressed_data = std::fs::read(&archived_files[0])?; + let decompressed_data = decompress_zlib(&compressed_data)?; + let snapshot_ts = extract_rotation_timestamp(&archived_files[0])?; + + let recovery = VersionedRecovery::new(vec![(&decompressed_data, snapshot_ts)])?; + + // Each timestamp should show the value at that time + let state_ts1 = recovery.recover_at_timestamp(ts1)?; + assert_eq!( + state_ts1.get("key").map(|tv| &tv.value), + Some(&make_string_value("1")) + ); + + let state_ts2 = recovery.recover_at_timestamp(ts2)?; + assert_eq!( + state_ts2.get("key").map(|tv| &tv.value), + Some(&make_string_value("2")) + ); + + let state_ts3 = recovery.recover_at_timestamp(ts3)?; + assert_eq!( + state_ts3.get("key").map(|tv| &tv.value), + Some(&make_string_value("3")) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_recovery_at_timestamp() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(bd_time::TestTimeProvider::new(datetime!( + 2024-01-01 00:00:00 UTC + ))); + + // Create a store and write some timestamped data + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + + store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts1 = store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + // Advance time to ensure different timestamps + time_provider.advance(10.milliseconds()); + + store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + let ts2 = store + .get_with_timestamp("key2") + .map(|tv| tv.timestamp) + .unwrap(); + + // Advance time again + time_provider.advance(10.milliseconds()); + + store + .insert("key1".to_string(), make_string_value("updated1")) + .await?; + let ts3 = store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + store.sync()?; + + // Rotate to create snapshot + store.rotate_journal().await?; + + // Read the snapshot + let archived_files = find_archived_journals(temp_dir.path())?; + assert_eq!(archived_files.len(), 1); + let compressed_data = std::fs::read(&archived_files[0])?; + let decompressed_data = decompress_zlib(&compressed_data)?; + let snapshot_ts = extract_rotation_timestamp(&archived_files[0])?; + + // Create recovery utility + let recovery = VersionedRecovery::new(vec![(&decompressed_data, snapshot_ts)])?; + + // Recover at ts1: should have only key1=value1 + let state_ts1 = recovery.recover_at_timestamp(ts1)?; + assert_eq!(state_ts1.len(), 1); + assert_eq!( + state_ts1.get("key1").map(|tv| &tv.value), + Some(&make_string_value("value1")) + ); + + // Recover at ts2: should have key1=value1, key2=value2 + let state_ts2 = recovery.recover_at_timestamp(ts2)?; + assert_eq!(state_ts2.len(), 2); + assert_eq!( + state_ts2.get("key1").map(|tv| &tv.value), + Some(&make_string_value("value1")) + ); + assert_eq!( + state_ts2.get("key2").map(|tv| &tv.value), + Some(&make_string_value("value2")) + ); + + // Recover at ts3: should have key1=updated1, key2=value2 + let state_ts3 = recovery.recover_at_timestamp(ts3)?; + assert_eq!(state_ts3.len(), 2); + assert_eq!( + state_ts3.get("key1").map(|tv| &tv.value), + Some(&make_string_value("updated1")) + ); + assert_eq!( + state_ts3.get("key2").map(|tv| &tv.value), + Some(&make_string_value("value2")) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_recovery_at_timestamp_with_rotation() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let time_provider = Arc::new(bd_time::TestTimeProvider::new(datetime!( + 2024-01-01 00:00:00 UTC + ))); + + let (mut store, _) = + VersionedKVStore::new(temp_dir.path(), "test", 4096, None, time_provider.clone()).await?; + + // Write some data before rotation + store + .insert("key1".to_string(), make_string_value("value1")) + .await?; + let ts1 = store + .get_with_timestamp("key1") + .map(|tv| tv.timestamp) + .unwrap(); + + time_provider.advance(10.milliseconds()); + + store + .insert("key2".to_string(), make_string_value("value2")) + .await?; + let ts2 = store + .get_with_timestamp("key2") + .map(|tv| tv.timestamp) + .unwrap(); + + // Rotate journal + store.rotate_journal().await?; + + time_provider.advance(10.milliseconds()); + + // Write data after rotation + store + .insert("key3".to_string(), make_string_value("value3")) + .await?; + let ts3 = store + .get_with_timestamp("key3") + .map(|tv| tv.timestamp) + .unwrap(); + + store.sync()?; + + // Rotate again to create final snapshot + store.rotate_journal().await?; + + // Read all snapshots + let archived_files = find_archived_journals(temp_dir.path())?; + assert_eq!(archived_files.len(), 2, "Should have two snapshots"); + + let mut all_snapshots = Vec::new(); + for archived_path in &archived_files { + let compressed_data = std::fs::read(archived_path)?; + let decompressed_data = decompress_zlib(&compressed_data)?; + let rotation_ts = extract_rotation_timestamp(archived_path)?; + all_snapshots.push((decompressed_data, rotation_ts)); + } + + // Create recovery from all snapshots + let snapshot_refs: Vec<(&[u8], u64)> = all_snapshots + .iter() + .map(|(data, ts)| (data.as_slice(), *ts)) + .collect(); + let recovery = VersionedRecovery::new(snapshot_refs)?; + + // Verify we can recover at any timestamp across all snapshots + let state_ts1 = recovery.recover_at_timestamp(ts1)?; + assert_eq!(state_ts1.len(), 1); + assert!(state_ts1.contains_key("key1")); + + // Recover at ts2 (should be in first snapshot) + let state_ts2 = recovery.recover_at_timestamp(ts2)?; + assert_eq!(state_ts2.len(), 2); + assert!(state_ts2.contains_key("key1")); + assert!(state_ts2.contains_key("key2")); + + // Recover at ts3 (should include all data from both snapshots) + let state_ts3 = recovery.recover_at_timestamp(ts3)?; + assert_eq!(state_ts3.len(), 3); + assert!(state_ts3.contains_key("key1")); + assert!(state_ts3.contains_key("key2")); + assert!(state_ts3.contains_key("key3")); + + Ok(()) +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/file_manager.rs b/bd-resilient-kv/src/versioned_kv_journal/file_manager.rs new file mode 100644 index 00000000..337382ef --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/file_manager.rs @@ -0,0 +1,60 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use std::path::{Path, PathBuf}; + +/// Find the active journal file by searching for the highest generation number. If we failed +/// to read the directory or there are no journal files, we return generation 0. +pub async fn find_active_journal(dir: &Path, name: &str) -> (PathBuf, u64) { + // Search for generation-based journals + let pattern = format!("{name}.jrn."); + + let mut max_gen = 0u64; + let Ok(mut entries) = tokio::fs::read_dir(dir).await else { + return (dir.join(format!("{name}.jrn.{max_gen}")), max_gen); + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + if let Some(suffix) = filename_str.strip_prefix(&pattern) { + // Parse generation number (before any .zz or other extensions) + if let Some(gen_str) = suffix.split('.').next() + && let Ok(generation) = gen_str.parse::() + { + max_gen = max_gen.max(generation); + } + } + } + + let path = dir.join(format!("{name}.jrn.{max_gen}")); + (path, max_gen) +} + +/// Compress an archived journal using zlib. +/// +/// This function compresses the source file to the destination using zlib compression. +/// The compression is performed in a blocking task to avoid holding up the async runtime. +pub async fn compress_archived_journal(source: &Path, dest: &Path) -> anyhow::Result<()> { + let source = source.to_owned(); + let dest = dest.to_owned(); + + tokio::task::spawn_blocking(move || { + use flate2::Compression; + use flate2::write::ZlibEncoder; + use std::io::{BufReader, copy}; + + let source_file = std::fs::File::open(&source)?; + let dest_file = std::fs::File::create(&dest)?; + let mut encoder = ZlibEncoder::new(dest_file, Compression::new(5)); + copy(&mut BufReader::new(source_file), &mut encoder)?; + encoder.finish()?; + Ok::<_, anyhow::Error>(()) + }) + .await? +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/framing.rs b/bd-resilient-kv/src/versioned_kv_journal/framing.rs new file mode 100644 index 00000000..ee7875d6 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/framing.rs @@ -0,0 +1,159 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +//! Wire format framing for journal entries. +//! +//! Per-entry format: +//! ```text +//! [length: varint][timestamp_micros: varint][payload: bytes][crc32: u32] +//! ``` +//! +//! - `length`: Total length of the frame (timestamp + payload + crc), varint encoded +//! - `timestamp_micros`: Microseconds since UNIX epoch (varint encoded) +//! - `payload`: Opaque binary data (format determined by caller) +//! - `crc32`: CRC32 checksum of (`timestamp_bytes` + payload) + +#[cfg(test)] +#[path = "./framing_test.rs"] +mod tests; + +use bytes::BufMut; +use crc32fast::Hasher; + +mod varint; + +const CRC_LEN: usize = 4; + +/// Frame structure for a journal entry. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Frame { + /// Timestamp in microseconds since UNIX epoch. + pub timestamp_micros: u64, + /// Opaque payload data. + pub payload: M, +} + +impl Frame { + /// Create a new frame. + #[must_use] + pub fn new(timestamp_micros: u64, payload: M) -> Self { + Self { + timestamp_micros, + payload, + } + } + + /// Calculate the encoded size of this frame. + #[must_use] + pub fn encoded_size(&self) -> usize { + let timestamp_varint_size = varint::compute_size(self.timestamp_micros); + let payload_size: usize = self.payload.compute_size().try_into().unwrap_or(0); + + let frame_content_len = timestamp_varint_size + payload_size + CRC_LEN; + let length_varint_size = varint::compute_size(frame_content_len as u64); + + length_varint_size + frame_content_len + } + + /// Encode this frame into a buffer. + /// + /// # Errors + /// Returns an error if the buffer is too small. + pub fn encode(&self, buf: &mut [u8]) -> anyhow::Result { + let required_size = self.encoded_size(); + if buf.len() < required_size { + anyhow::bail!( + "Buffer too small: need {} bytes, have {} bytes", + required_size, + buf.len() + ); + } + + let mut cursor = buf; + + // Encode timestamp to calculate frame length + let mut timestamp_buf = [0u8; varint::MAX_SIZE]; + let timestamp_len = varint::encode(self.timestamp_micros, &mut timestamp_buf); + + let payload_bytes = self + .payload + .write_to_bytes() + .map_err(|e| anyhow::anyhow!("Failed to serialize payload: {e}"))?; + + // Frame length = timestamp + payload + crc + let frame_len = timestamp_len + payload_bytes.len() + CRC_LEN; + + // Encode frame length as varint + let mut length_buf = [0u8; varint::MAX_SIZE]; + let length_len = varint::encode(frame_len as u64, &mut length_buf); + cursor.put_slice(&length_buf[.. length_len]); + + cursor.put_slice(×tamp_buf[.. timestamp_len]); + cursor.put_slice(&payload_bytes); + + let mut hasher = Hasher::new(); + hasher.update(×tamp_buf[.. timestamp_len]); + hasher.update(payload_bytes.as_slice()); + let crc = hasher.finalize(); + + cursor.put_u32_le(crc); + + Ok(required_size) + } + + /// Decode a frame from a buffer. + /// + /// Returns (Frame, `bytes_consumed`) or error if invalid/incomplete. + pub fn decode(buf: &[u8]) -> anyhow::Result<(Self, usize)> { + // Decode frame length varint + let (frame_len_u64, length_len) = + varint::decode(buf).ok_or_else(|| anyhow::anyhow!("Invalid length varint"))?; + + let frame_len = usize::try_from(frame_len_u64) + .map_err(|_| anyhow::anyhow!("Frame length too large: {frame_len_u64}"))?; + + // Check if we have the complete frame + let total_len = length_len + frame_len; // length varint + frame content + if buf.len() < total_len { + anyhow::bail!( + "Incomplete frame: need {} bytes, have {} bytes", + total_len, + buf.len() + ); + } + + let frame_data = &buf[length_len .. total_len]; + + // Decode timestamp varint + let (timestamp_micros, timestamp_len) = + varint::decode(frame_data).ok_or_else(|| anyhow::anyhow!("Invalid timestamp varint"))?; + + // Extract payload and CRC + if frame_data.len() < timestamp_len + CRC_LEN { + anyhow::bail!("Frame too small for CRC"); + } + + let payload_end = frame_data.len() - CRC_LEN; + let payload = frame_data[timestamp_len .. payload_end].to_vec(); + let stored_crc = u32::from_le_bytes(frame_data[payload_end ..].try_into()?); + + // Verify CRC + let mut hasher = Hasher::new(); + hasher.update(&frame_data[.. timestamp_len]); // timestamp bytes + hasher.update(&payload); // payload + let computed_crc = hasher.finalize(); + + if stored_crc != computed_crc { + anyhow::bail!("CRC mismatch: expected 0x{stored_crc:08x}, got 0x{computed_crc:08x}"); + } + + let payload = + M::parse_from_bytes(&payload).map_err(|e| anyhow::anyhow!("Failed to parse payload: {e}"))?; + + Ok((Self::new(timestamp_micros, payload), total_len)) + } +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/framing/varint.rs b/bd-resilient-kv/src/versioned_kv_journal/framing/varint.rs new file mode 100644 index 00000000..26decd90 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/framing/varint.rs @@ -0,0 +1,45 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +/// Maximum varint size for u64 (10 bytes) +pub const MAX_SIZE: usize = 10; + +/// Calculate the size of a u64 when encoded as a varint. +#[allow(clippy::cast_possible_truncation)] +pub fn compute_size(value: u64) -> usize { + // Safe cast: varint encoding of u64 is at most 10 bytes, which fits in usize on all platforms + ::protobuf::rt::compute_raw_varint64_size(value) as usize +} + +/// Encode a u64 as a varint into the buffer. +/// Returns the number of bytes written. +pub fn encode(value: u64, mut buf: &mut [u8]) -> usize { + let size = compute_size(value); + debug_assert!(buf.len() >= size, "Buffer too small for varint encoding"); + + if protobuf::CodedOutputStream::new(&mut buf) + .write_raw_varint64(value) + .is_err() + { + // Should never happen as we ensure that there is enough space elsewhere. + return 0; + } + + size +} + +/// Decode a varint from the buffer. +/// Returns (value, `bytes_read`) or None if buffer is incomplete/invalid. +#[must_use] +pub fn decode(buf: &[u8]) -> Option<(u64, usize)> { + let value = protobuf::CodedInputStream::from_bytes(buf) + .read_raw_varint64() + .ok()?; + + let bytes_read = compute_size(value); + Some((value, bytes_read)) +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/framing_test.rs b/bd-resilient-kv/src/versioned_kv_journal/framing_test.rs new file mode 100644 index 00000000..a5b43596 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/framing_test.rs @@ -0,0 +1,237 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] + +use super::*; +use crate::tests::make_string_value; +use bd_proto::protos::state::payload::StateValue; + +#[test] +fn varint_encoding() { + let test_cases = vec![ + (0u64, vec![0x00]), + (1u64, vec![0x01]), + (127u64, vec![0x7F]), + (128u64, vec![0x80, 0x01]), + (300u64, vec![0xAC, 0x02]), + (16_384u64, vec![0x80, 0x80, 0x01]), + ( + u64::MAX, + vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01], + ), + ]; + + for (value, expected) in test_cases { + let mut buf = [0u8; varint::MAX_SIZE]; + let len = varint::encode(value, &mut buf); + assert_eq!(&buf[.. len], &expected[..], "Failed for value {value}"); + } +} + +#[test] +fn varint_decoding() { + let test_cases = vec![ + (vec![0x00], 0u64, 1), + (vec![0x01], 1u64, 1), + (vec![0x7F], 127u64, 1), + (vec![0x80, 0x01], 128u64, 2), + (vec![0xAC, 0x02], 300u64, 2), + (vec![0x80, 0x80, 0x01], 16_384u64, 3), + ( + vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01], + u64::MAX, + 10, + ), + ]; + + for (buf, expected_value, expected_len) in test_cases { + let (value, len) = varint::decode(&buf).unwrap(); + assert_eq!(value, expected_value, "Failed for buffer {buf:?}"); + assert_eq!(len, expected_len, "Wrong length for buffer {buf:?}"); + } +} + +#[test] +fn varint_roundtrip() { + let values = vec![0, 1, 127, 128, 255, 256, 65535, 65536, 1_000_000, u64::MAX]; + + for value in values { + let mut buf = [0u8; varint::MAX_SIZE]; + let encoded_len = varint::encode(value, &mut buf); + let (decoded_value, decoded_len) = varint::decode(&buf).unwrap(); + + assert_eq!(decoded_value, value, "Roundtrip failed for {value}"); + assert_eq!(decoded_len, encoded_len, "Length mismatch for {value}"); + } +} + +#[test] +fn varint_incomplete() { + // Incomplete varint (has continuation bit but no next byte) + let buf = vec![0x80]; + assert!(varint::decode(&buf).is_none()); +} + +#[test] +fn varint_too_long() { + // 11 bytes (exceeds MAX_VARINT_SIZE) + let buf = vec![ + 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01, + ]; + assert!(varint::decode(&buf).is_none()); +} + +#[test] +fn frame_encode_decode() { + let frame = Frame::new(1_700_000_000_000_000, make_string_value("value")); + + let mut buf = vec![0u8; 1024]; + let encoded_len = frame.encode(&mut buf).unwrap(); + + let (decoded_frame, decoded_len) = Frame::::decode(&buf).unwrap(); + + assert_eq!(decoded_frame, frame); + assert_eq!(decoded_len, encoded_len); +} + +#[test] +fn frame_with_delete() { + let frame = Frame::new(1_700_000_000_000_000, make_string_value("")); + + let mut buf = vec![0u8; 1024]; + let encoded_len = frame.encode(&mut buf).unwrap(); + + let (decoded_frame, decoded_len) = Frame::::decode(&buf).unwrap(); + + assert_eq!(decoded_frame, frame); + assert_eq!(decoded_len, encoded_len); +} + +#[test] +fn frame_empty_payload() { + let frame = Frame::new(1_700_000_000_000_000, StateValue::default()); + + let mut buf = vec![0u8; 1024]; + let encoded_len = frame.encode(&mut buf).unwrap(); + + let (decoded_frame, decoded_len) = Frame::::decode(&buf).unwrap(); + + assert_eq!(decoded_frame, frame); + assert_eq!(decoded_len, encoded_len); +} + +#[test] +fn frame_various_timestamps() { + let timestamps = vec![0, 1, 127, 128, 1_000_000, 1_700_000_000_000_000, u64::MAX]; + + for timestamp in timestamps { + let frame = Frame::new(timestamp, make_string_value("test")); + let mut buf = vec![0u8; 1024]; + let encoded_len = frame.encode(&mut buf).unwrap(); + let (decoded_frame, decoded_len) = Frame::::decode(&buf).unwrap(); + + assert_eq!(decoded_frame.timestamp_micros, timestamp); + assert_eq!(decoded_frame.payload, make_string_value("test")); + assert_eq!(decoded_len, encoded_len); + } +} + +#[test] +fn frame_buffer_too_small() { + let frame = Frame::new(1_700_000_000_000_000, make_string_value("key:value")); + let mut buf = vec![0u8; 5]; // Too small + + let result = frame.encode(&mut buf); + assert!(result.is_err()); +} + +#[test] +fn frame_incomplete_length() { + let buf = vec![0x80]; // Incomplete varint (has continuation bit but no next byte) + + let result = Frame::::decode(&buf); + assert!(result.is_err()); +} + +#[test] +fn frame_incomplete_data() { + // Frame says it needs 100 bytes but we only provide partial data + let mut buf = vec![0u8; 20]; + // Encode length varint for 100 bytes + let length_len = varint::encode(100, &mut buf); + // Truncate to simulate incomplete frame + buf.truncate(length_len + 10); + + let result = Frame::::decode(&buf); + assert!(result.is_err()); +} + +#[test] +fn frame_crc_mismatch() { + let frame = Frame::new(1_700_000_000_000_000, make_string_value("key:value")); + + let mut buf = vec![0u8; 1024]; + let encoded_len = frame.encode(&mut buf).unwrap(); + + // Corrupt the CRC + buf[encoded_len - 1] ^= 0xFF; + + let result = Frame::::decode(&buf); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("CRC mismatch")); +} + +#[test] +fn frame_multiple_frames() { + let frame1 = Frame::new(1000, make_string_value("first")); + let frame2 = Frame::new(2000, make_string_value("second")); + let frame3 = Frame::new(3000, make_string_value("third")); + + let mut buf = vec![0u8; 1024]; + let len1 = frame1.encode(&mut buf).unwrap(); + let len2 = frame2.encode(&mut buf[len1 ..]).unwrap(); + let len3 = frame3.encode(&mut buf[len1 + len2 ..]).unwrap(); + + // Decode all three + let (decoded1, consumed1) = Frame::::decode(&buf).unwrap(); + let (decoded2, consumed2) = Frame::::decode(&buf[consumed1 ..]).unwrap(); + let (decoded3, consumed3) = Frame::::decode(&buf[consumed1 + consumed2 ..]).unwrap(); + + assert_eq!(decoded1, frame1); + assert_eq!(decoded2, frame2); + assert_eq!(decoded3, frame3); + assert_eq!(consumed1, len1); + assert_eq!(consumed2, len2); + assert_eq!(consumed3, len3); +} + +#[test] +fn frame_length_varint_encoding() { + // Test that frame length is properly varint-encoded + // Small frames should use 1 byte for length, larger frames may use more + + // Very small payload (length should fit in 1 byte varint) + let small_frame = Frame::new(0, make_string_value("x")); + let mut buf = vec![0u8; 1024]; + let encoded_len = small_frame.encode(&mut buf).unwrap(); + + // First byte should be the length varint + let (frame_len, length_varint_len) = varint::decode(&buf).unwrap(); + assert_eq!( + length_varint_len, 1, + "Small frame should use 1-byte varint for length" + ); + + // Verify total encoded size matches + assert_eq!(encoded_len as u64, length_varint_len as u64 + frame_len); + + // Verify decoding works + let (decoded, consumed) = Frame::::decode(&buf).unwrap(); + assert_eq!(decoded, small_frame); + assert_eq!(consumed, encoded_len); +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/journal.rs b/bd-resilient-kv/src/versioned_kv_journal/journal.rs new file mode 100644 index 00000000..e4f92793 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/journal.rs @@ -0,0 +1,324 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use super::framing::Frame; +use bd_client_common::error::InvariantError; +use bd_time::TimeProvider; +use std::sync::Arc; + +/// Indicates whether partial data loss has occurred. Partial data loss is detected when the +/// journal would be parsed from disk, but we were not able to find valid records up to `position` +/// as stored in the header. +pub enum PartialDataLoss { + Yes, + None, +} + +/// Timestamped implementation of a journaling system that uses timestamps +/// as the version identifier for point-in-time recovery. +/// +/// Each write operation is assigned a monotonically non-decreasing timestamp (in microseconds +/// since UNIX epoch), enabling exact state reconstruction at any historical timestamp. +/// The monotonicity is enforced by clamping: if the system clock goes backwards, we reuse +/// the same timestamp value to maintain ordering guarantees. When timestamps collide, +/// journal ordering determines precedence. +pub struct VersionedJournal<'a, M> { + position: usize, + buffer: &'a mut [u8], + high_water_mark: usize, + high_water_mark_triggered: bool, + last_timestamp: u64, // Most recent timestamp written (for monotonic enforcement) + pub(crate) time_provider: Arc, + _payload_marker: std::marker::PhantomData, +} + +// Versioned KV files have the following structure: +// | Position | Data | Type | +// |----------|--------------------------|----------------| +// | 0 | Format Version | u64 | +// | 8 | Position | u64 | +// | 16 | Reserved | u8 | +// | 17 | Frame 1 | Framed Entry | +// | ... | Frame 2 | Framed Entry | +// | ... | Frame N | Framed Entry | +// +// Frame format: [length: u32][timestamp_micros: varint][protobuf_payload: bytes][crc32: u32] +// +// # Timestamp Semantics +// +// Timestamps serve as both version identifiers and logical clocks with monotonic guarantees: +// - Each write gets a timestamp that is guaranteed to be >= previous writes (non-decreasing) +// - If system clock goes backward, timestamps are clamped to last_timestamp (reuse same value) +// - When timestamps collide, journal ordering determines precedence +// - This ensures total ordering while allowing correlation with external timestamped systems + +// The journal format version, incremented on incompatible changes. +const VERSION: u64 = 1; + +const HEADER_SIZE: usize = 17; + +// Minimum buffer size for a valid journal +const MIN_BUFFER_SIZE: usize = HEADER_SIZE + 4; + +/// Returns by +struct BufferState { + highest_timestamp: u64, + partial_data_loss: PartialDataLoss, +} + +/// Write to the version field of a journal buffer. +fn write_version_field(buffer: &mut [u8], version: u64) { + let version_bytes = version.to_le_bytes(); + buffer[0 .. 8].copy_from_slice(&version_bytes); +} + +/// Write the version to a journal buffer. +fn write_version(buffer: &mut [u8]) { + write_version_field(buffer, VERSION); +} + +fn read_position(buffer: &[u8]) -> anyhow::Result { + let position_bytes: [u8; 8] = buffer[8 .. 16].try_into()?; + let position_u64 = u64::from_le_bytes(position_bytes); + let position = usize::try_from(position_u64) + .map_err(|_| anyhow::anyhow!("Position {position_u64} too large for usize"))?; + let buffer_len = buffer.len(); + if position >= buffer_len { + anyhow::bail!("Invalid position: {position}, buffer size: {buffer_len}",); + } + Ok(position) +} + +/// Write the position to a journal buffer. +fn write_position(buffer: &mut [u8], position: usize) { + let position_bytes = (position as u64).to_le_bytes(); + buffer[8 .. 16].copy_from_slice(&position_bytes); +} + +fn validate_buffer_len(buffer: &[u8]) -> anyhow::Result { + let buffer_len = buffer.len(); + if buffer_len < MIN_BUFFER_SIZE { + anyhow::bail!( + "Buffer too small: {buffer_len} bytes, but need at least {MIN_BUFFER_SIZE} bytes" + ); + } + Ok(buffer_len) +} + +/// Validate high water mark ratio and calculate the position from buffer length. +fn calculate_high_water_mark( + buffer_len: usize, + high_water_mark_ratio: Option, +) -> anyhow::Result { + let ratio = high_water_mark_ratio.unwrap_or(0.8); + if !(0.0 ..= 1.0).contains(&ratio) { + anyhow::bail!("High water mark ratio must be between 0.0 and 1.0, got: {ratio}"); + } + + #[allow( + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + clippy::cast_sign_loss + )] + let high_water_mark = (buffer_len as f32 * ratio) as usize; + Ok(high_water_mark) +} + +impl<'a, M: protobuf::Message> VersionedJournal<'a, M> { + /// Create a new versioned journal using the provided buffer as storage space. + /// + /// # Arguments + /// * `buffer` - The storage buffer + /// * `high_water_mark_ratio` - Optional ratio (0.0 to 1.0) for high water mark. Default: 0.8 + /// * `entries` - Iterator of entries to be inserted into the newly created buffer. + /// + /// # Errors + /// Returns an error if the buffer is too small or if `high_water_mark_ratio` is invalid. + pub fn new( + buffer: &'a mut [u8], + high_water_mark_ratio: Option, + time_provider: Arc, + entries: impl IntoIterator, + ) -> anyhow::Result { + let buffer_len = validate_buffer_len(buffer)?; + let high_water_mark = calculate_high_water_mark(buffer_len, high_water_mark_ratio)?; + + // Write header + let mut position = HEADER_SIZE; + + let mut max_state_timestamp = None; + + // Write all current state with their original timestamps + for (entry, timestamp) in entries { + max_state_timestamp = Some(timestamp); + + let frame = Frame::new(timestamp, entry); + + // Encode frame + let available_space = &mut buffer[position ..]; + let encoded_len = frame.encode(available_space)?; + + position += encoded_len; + } + + write_position(buffer, position); + write_version(buffer); + buffer[16] = 0; // Reserved byte + + let now = Self::unix_timestamp_micros(time_provider.as_ref())?; + + Ok(Self { + position, + buffer, + high_water_mark, + high_water_mark_triggered: false, + last_timestamp: max_state_timestamp.unwrap_or(now), + time_provider, + _payload_marker: std::marker::PhantomData, + }) + } + + /// Create a new versioned journal with state loaded from the provided buffer. + /// + /// # Arguments + /// * `buffer` - The storage buffer containing existing versioned KV data + /// * `high_water_mark_ratio` - Optional ratio (0.0 to 1.0) for high water mark. Default: 0.8 + /// + /// # Errors + /// Returns an error if the buffer is invalid, corrupted, or if `high_water_mark_ratio` is + /// invalid. + pub fn from_buffer( + buffer: &'a mut [u8], + high_water_mark_ratio: Option, + time_provider: Arc, + f: impl FnMut(&M, u64), + ) -> anyhow::Result<(Self, PartialDataLoss)> { + let buffer_len = validate_buffer_len(buffer)?; + let position = read_position(buffer)?; + let high_water_mark = calculate_high_water_mark(buffer_len, high_water_mark_ratio)?; + + // Read version + let version_bytes: [u8; 8] = buffer[0 .. 8].try_into()?; + let version = u64::from_le_bytes(version_bytes); + + if version != VERSION { + anyhow::bail!("Unsupported version: {version}, expected {VERSION}"); + } + + // Find initialization timestamp and highest timestamp in the journal + let buffer_state = Self::iterate_buffer(buffer, position, f); + + Ok(( + Self { + position, + buffer, + high_water_mark, + high_water_mark_triggered: position >= high_water_mark, + last_timestamp: buffer_state.highest_timestamp, + time_provider, + _payload_marker: std::marker::PhantomData, + }, + buffer_state.partial_data_loss, + )) + } + + /// Scan the journal to find the highest timestamp. + fn iterate_buffer(buffer: &[u8], position: usize, mut f: impl FnMut(&M, u64)) -> BufferState { + let mut cursor = HEADER_SIZE; + let mut state = BufferState { + highest_timestamp: 0, + partial_data_loss: PartialDataLoss::None, + }; + + while cursor < position { + let remaining = &buffer[cursor .. position]; + + match Frame::::decode(remaining) { + Ok((frame, consumed)) => { + f(&frame.payload, frame.timestamp_micros); + state.highest_timestamp = frame.timestamp_micros; + cursor += consumed; + }, + Err(_) => { + // Stop on first decode error (partial frame or corruption) + break; + }, + } + } + + state + } + + /// Get the next monotonically increasing timestamp. + /// + /// This ensures that even if the system clock goes backwards, timestamps remain + /// monotonically increasing by clamping to `last_timestamp` (reusing the same value). + /// This prevents artificial clock skew while maintaining ordering guarantees. + fn next_monotonic_timestamp(&mut self) -> anyhow::Result { + let current = self.current_timestamp()?; + let monotonic = std::cmp::max(current, self.last_timestamp); + self.last_timestamp = monotonic; + Ok(monotonic) + } + + fn set_position(&mut self, position: usize) { + self.position = position; + write_position(self.buffer, position); + self.check_high_water_mark(); + } + + fn check_high_water_mark(&mut self) { + if self.position >= self.high_water_mark { + self.trigger_high_water(); + } + } + + fn trigger_high_water(&mut self) { + self.high_water_mark_triggered = true; + } + + /// Insert a new entry into the journal with the given payload. + /// Returns the timestamp of the operation. + /// + /// The timestamp is monotonically non-decreasing and serves as the version identifier. + /// If the system clock goes backwards, timestamps are clamped to maintain monotonicity. + pub fn insert_entry(&mut self, message: impl protobuf::MessageFull) -> anyhow::Result { + let timestamp = self.next_monotonic_timestamp()?; + + // Create payload + let frame = Frame::new(timestamp, message); + + // Encode frame + let available_space = &mut self.buffer[self.position ..]; + let encoded_len = frame.encode(available_space)?; + + self.set_position(self.position + encoded_len); + Ok(timestamp) + } + + /// Check if the high water mark has been triggered. + #[must_use] + pub fn is_high_water_mark_triggered(&self) -> bool { + self.high_water_mark_triggered + } + + /// Get current timestamp in microseconds since UNIX epoch. + fn current_timestamp(&self) -> std::result::Result { + Self::unix_timestamp_micros(self.time_provider.as_ref()) + } + + fn unix_timestamp_micros( + time_provider: &dyn TimeProvider, + ) -> std::result::Result { + time_provider + .now() + .unix_timestamp_nanos() + .checked_div(1_000) + .and_then(|micros| micros.try_into().ok()) + .ok_or(InvariantError::Invariant) + } +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/memmapped_journal.rs b/bd-resilient-kv/src/versioned_kv_journal/memmapped_journal.rs new file mode 100644 index 00000000..fa459e3f --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/memmapped_journal.rs @@ -0,0 +1,165 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use super::journal::VersionedJournal; +use crate::versioned_kv_journal::journal::PartialDataLoss; +use bd_time::TimeProvider; +use memmap2::{MmapMut, MmapOptions}; +use std::fs::OpenOptions; +use std::path::Path; +use std::sync::Arc; + +/// Memory-mapped implementation of a timestamped journal. +/// +/// This implementation uses memory-mapped files to provide persistence while maintaining +/// the efficiency of in-memory operations. All changes are automatically synced to disk. +/// Each write operation receives a timestamp for point-in-time recovery. +/// +/// # Safety +/// During construction, we unsafely declare mmap's internal buffer as having a static +/// lifetime, but it's actually tied to the lifetime of `inner`. This works because +/// nothing external holds a reference to the buffer. +pub struct MemMappedVersionedJournal { + // Note: mmap MUST de-init AFTER versioned_kv because mmap uses it. + mmap: MmapMut, + inner: VersionedJournal<'static, M>, +} + +impl std::ops::Deref for MemMappedVersionedJournal { + type Target = VersionedJournal<'static, M>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for MemMappedVersionedJournal { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl MemMappedVersionedJournal { + /// Create a memory-mapped buffer from a file and convert it to a static lifetime slice. + /// + /// # Safety + /// The returned slice has a static lifetime, but it's actually tied to the lifetime of the + /// `MmapMut`. This is safe as long as the `MmapMut` is kept alive for the entire lifetime of + /// the slice usage. + #[allow(clippy::needless_pass_by_value)] + unsafe fn create_mmap_buffer( + file: std::fs::File, + ) -> anyhow::Result<(MmapMut, &'static mut [u8])> { + let mut mmap = unsafe { MmapOptions::new().map_mut(&file)? }; + + // Convert the mmap slice to a static lifetime slice + // This is safe because we keep the mmap alive for the lifetime of the struct + let buffer: &'static mut [u8] = + unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap.len()) }; + + Ok((mmap, buffer)) + } + + /// Create a new memory-mapped versioned KV journal using the provided file path. + /// + /// The file will be created if it doesn't exist, or opened if it does. + /// The file will be resized to the specified size if it's different. + /// + /// # Arguments + /// * `file_path` - Path to the file to use for storage + /// * `size` - Minimum size of the file in bytes + /// * `high_water_mark_ratio` - Optional ratio (0.0 to 1.0) for high water mark. Default: 0.8 + /// * `entries` - Iterator of entries to be inserted into the newly created buffer. + /// + /// # Errors + /// Returns an error if the file cannot be created/opened or memory-mapped. + pub fn new>( + file_path: P, + size: usize, + high_water_mark_ratio: Option, + time_provider: Arc, + entries: impl IntoIterator, + ) -> anyhow::Result { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(file_path)?; + + let file_len = file.metadata()?.len(); + if file_len != size as u64 { + file.set_len(size as u64)?; + } + + let (mmap, buffer) = unsafe { Self::create_mmap_buffer(file)? }; + + let versioned_kv = + VersionedJournal::new(buffer, high_water_mark_ratio, time_provider, entries)?; + + Ok(Self { + mmap, + inner: versioned_kv, + }) + } + + /// Create a new memory-mapped versioned KV journal from an existing file. + /// + /// The file must already exist and contain a properly formatted versioned KV journal. + /// The file will be resized to the specified size if it's different. + /// + /// # Arguments + /// * `file_path` - Path to the existing file + /// * `size` - Size to resize the file to in bytes + /// * `high_water_mark_ratio` - Optional ratio (0.0 to 1.0) for high water mark. Default: 0.8 + /// + /// # Errors + /// Returns an error if the file cannot be opened, memory-mapped, or contains invalid data. + /// Note: If the new size is smaller than the current file size, data may be truncated. + pub fn from_file>( + file_path: P, + size: usize, + high_water_mark_ratio: Option, + time_provider: Arc, + f: impl FnMut(&M, u64), + ) -> anyhow::Result<(Self, PartialDataLoss)> { + let file = OpenOptions::new().read(true).write(true).open(file_path)?; + + let file_len = file.metadata()?.len(); + if file_len != size as u64 { + file.set_len(size as u64)?; + } + + let (mmap, buffer) = unsafe { Self::create_mmap_buffer(file)? }; + + let (versioned_kv, partial_data_loss) = + VersionedJournal::from_buffer(buffer, high_water_mark_ratio, time_provider, f)?; + + Ok(( + Self { + mmap, + inner: versioned_kv, + }, + partial_data_loss, + )) + } + + /// Synchronize changes to disk. + /// + /// This method explicitly flushes any pending changes to the underlying file. + /// Note that changes are typically synced automatically by the OS, but this provides + /// explicit control when needed. + /// + /// This is a blocking operation that performs synchronous I/O (`msync()` system call). + /// In async contexts, consider wrapping this call with `tokio::task::spawn_blocking`. + /// + /// # Errors + /// Returns an error if the sync operation fails. + pub fn sync(journal: &Self) -> anyhow::Result<()> { + journal.mmap.flush().map_err(Into::into) + } +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/mod.rs b/bd-resilient-kv/src/versioned_kv_journal/mod.rs new file mode 100644 index 00000000..486d57d0 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/mod.rs @@ -0,0 +1,35 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use bd_proto::protos::state; + +mod file_manager; +mod framing; +mod journal; +mod memmapped_journal; +pub mod recovery; +pub mod store; + +/// Represents a value with its associated timestamp. +#[derive(Debug, Clone, PartialEq)] +pub struct TimestampedValue { + /// The value stored in the key-value store. + pub value: state::payload::StateValue, + + /// The timestamp (in microseconds since UNIX epoch) when this value was last written. + pub timestamp: u64, +} + +#[cfg(test)] +pub fn make_string_value(s: &str) -> state::payload::StateValue { + state::payload::StateValue { + value_type: Some(state::payload::state_value::Value_type::StringValue( + s.to_string(), + )), + ..Default::default() + } +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/recovery.rs b/bd-resilient-kv/src/versioned_kv_journal/recovery.rs new file mode 100644 index 00000000..edf39166 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/recovery.rs @@ -0,0 +1,217 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use crate::versioned_kv_journal::TimestampedValue; +use crate::versioned_kv_journal::framing::Frame; +use ahash::AHashMap; +use bd_proto::protos::state::payload::StateKeyValuePair; + +/// A utility for recovering state at arbitrary timestamps from journal snapshots. +/// +/// This utility operates on raw uncompressed byte slices from archived journal snapshots +/// (created during rotation) and can reconstruct the key-value state at any historical +/// timestamp by replaying journal entries. +/// +/// # Recovery Model +/// +/// Recovery works exclusively with journal snapshots - complete archived journals created +/// during rotation. Each snapshot contains the full compacted state at the time of rotation, +/// with all entries preserving their original timestamps. +#[derive(Debug)] +pub struct VersionedRecovery { + snapshots: Vec, +} + +#[derive(Debug)] +struct SnapshotInfo { + data: Vec, + snapshot_timestamp: u64, +} + +impl VersionedRecovery { + /// Create a new recovery utility from a list of uncompressed snapshot byte slices. + /// + /// The snapshots should be provided in chronological order (oldest to newest). + /// Each snapshot must be a valid uncompressed versioned journal (VERSION 1 format). + /// + /// # Arguments + /// + /// * `snapshots` - A vector of tuples containing (`snapshot_data`, `snapshot_timestamp`). The + /// `snapshot_timestamp` represents when this snapshot was created (archived during rotation). + /// + /// # Errors + /// + /// Returns an error if any snapshot is invalid or cannot be parsed. + /// + /// # Note + /// + /// Callers must decompress snapshot data before passing it to this method if the data + /// is compressed (e.g., with zlib). + pub fn new(snapshots: Vec<(&[u8], u64)>) -> anyhow::Result { + let snapshot_infos = snapshots + .into_iter() + .map(|(data, snapshot_timestamp)| SnapshotInfo { + data: data.to_vec(), + snapshot_timestamp, + }) + .collect(); + + Ok(Self { + snapshots: snapshot_infos, + }) + } + + /// Recover the key-value state at a specific timestamp. + /// + /// This method replays all snapshot entries from all provided snapshots up to and including + /// the target timestamp, reconstructing the exact state at that point in time. + /// + /// ## Important: "Up to and including" semantics + /// + /// When recovering at timestamp T, **ALL entries with timestamp ≤ T are included**. + /// This is critical because timestamps are monotonically non-decreasing (not strictly + /// increasing): if the system clock doesn't advance between writes, multiple entries + /// will share the same timestamp value. These entries must all be included to ensure + /// a consistent view of the state. + /// + /// Entries with the same timestamp are applied in version order (which reflects write + /// order), so later writes correctly overwrite earlier ones ("last write wins"). + /// + /// # Arguments + /// + /// * `target_timestamp` - The timestamp (in microseconds since UNIX epoch) to recover state at + /// + /// # Returns + /// + /// A hashmap containing all key-value pairs with their timestamps as they existed at the + /// target timestamp. + /// + /// # Errors + /// + /// Returns an error if: + /// - The target timestamp is not found in any snapshot + /// - Snapshot data is corrupted or invalid + pub fn recover_at_timestamp( + &self, + target_timestamp: u64, + ) -> anyhow::Result> { + let mut map = AHashMap::new(); + + // Replay snapshots up to and including the snapshot that was created at or after + // target_timestamp. A snapshot with snapshot_timestamp T contains all state up to time T. + for snapshot in &self.snapshots { + // Replay entries from this snapshot up to target_timestamp + replay_journal_to_timestamp(&snapshot.data, target_timestamp, &mut map)?; + + // If this snapshot was created at or after our target timestamp, we're done. + // This snapshot contains all state up to target_timestamp. + if snapshot.snapshot_timestamp >= target_timestamp { + break; + } + } + + Ok(map) + } + + /// Get the current state from the latest snapshot. + /// + /// Since each snapshot contains the complete compacted state at rotation time, + /// only the last snapshot needs to be read to get the current state. + /// + /// # Errors + /// + /// Returns an error if snapshot data is corrupted or invalid. + pub fn recover_current(&self) -> anyhow::Result> { + let mut map = AHashMap::new(); + + // Optimization: Only read the last snapshot since rotation writes the complete + // compacted state, so the last snapshot contains all current state. + if let Some(last_snapshot) = self.snapshots.last() { + replay_journal_to_timestamp(&last_snapshot.data, u64::MAX, &mut map)?; + } + + Ok(map) + } +} + +/// Replay snapshot entries up to and including the target timestamp. +/// +/// This function processes all entries with timestamp ≤ `target_timestamp`. +/// The "up to and including" behavior is essential because timestamps are monotonically +/// non-decreasing (not strictly increasing): if the system clock doesn't advance between +/// writes, multiple entries may share the same timestamp. All such entries must be +/// applied to ensure state consistency. +/// +/// Entries are processed in version order, ensuring "last write wins" semantics when +/// multiple operations affect the same key at the same timestamp. +fn replay_journal_to_timestamp( + buffer: &[u8], + target_timestamp: u64, + map: &mut AHashMap, +) -> anyhow::Result<()> { + // Skip the header (17 bytes: version + position + reserved) + const HEADER_SIZE: usize = 17; + + if buffer.len() < HEADER_SIZE { + anyhow::bail!("Buffer too small: {}", buffer.len()); + } + + // Read position from header (bytes 8-15) + let position_bytes: [u8; 8] = buffer[8 .. 16] + .try_into() + .map_err(|_| anyhow::anyhow!("Failed to read position"))?; + #[allow(clippy::cast_possible_truncation)] + let position = u64::from_le_bytes(position_bytes) as usize; + + if position < HEADER_SIZE { + anyhow::bail!("Invalid position: {position}, must be at least {HEADER_SIZE}"); + } + + if position > buffer.len() { + anyhow::bail!( + "Invalid position: {position}, buffer size: {}", + buffer.len() + ); + } + + // Decode frames from the journal data + let mut offset = 0; + let data = &buffer[HEADER_SIZE .. position]; + + while offset < data.len() { + match Frame::::decode(&data[offset ..]) { + Ok((frame, bytes_read)) => { + // Only apply entries up to target timestamp + if frame.timestamp_micros > target_timestamp { + break; + } + + if let Some(value) = frame.payload.value.into_option() { + // Insertion - store the protobuf StateValue + map.insert( + frame.payload.key, + TimestampedValue { + value, + timestamp: frame.timestamp_micros, + }, + ); + } else { + // Deletion + map.remove(&frame.payload.key); + } + + offset += bytes_read; + }, + Err(_) => { + // End of valid data or corrupted frame + break; + }, + } + } + + Ok(()) +} diff --git a/bd-resilient-kv/src/versioned_kv_journal/store.rs b/bd-resilient-kv/src/versioned_kv_journal/store.rs new file mode 100644 index 00000000..72402277 --- /dev/null +++ b/bd-resilient-kv/src/versioned_kv_journal/store.rs @@ -0,0 +1,467 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use crate::versioned_kv_journal::TimestampedValue; +use crate::versioned_kv_journal::file_manager::{self, compress_archived_journal}; +use crate::versioned_kv_journal::journal::PartialDataLoss; +use crate::versioned_kv_journal::memmapped_journal::MemMappedVersionedJournal; +use ahash::AHashMap; +use bd_proto::protos::state::payload::{StateKeyValuePair, StateValue}; +use bd_time::TimeProvider; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +#[derive(Debug, PartialEq, Eq)] +pub enum DataLoss { + Total, + Partial, + None, +} + +impl From for DataLoss { + fn from(value: PartialDataLoss) -> Self { + match value { + PartialDataLoss::Yes => Self::Partial, + PartialDataLoss::None => Self::None, + } + } +} + +/// A persistent key-value store with timestamp tracking. +/// +/// `VersionedKVStore` provides HashMap-like semantics backed by a timestamped journal that +/// +/// # Rotation Strategy +/// +/// When the journal reaches its high water mark, the store automatically rotates to a new journal. +/// The rotation process creates a snapshot of the current state while preserving timestamp +/// semantics for accurate point-in-time recovery. +/// +/// For detailed information about timestamp semantics, recovery bucketing, and invariants, +/// see the `VERSIONED_FORMAT.md` documentation. +pub struct VersionedKVStore { + journal: MemMappedVersionedJournal, + cached_map: AHashMap, + dir_path: PathBuf, + journal_name: String, + buffer_size: usize, + high_water_mark_ratio: Option, + current_generation: u64, +} + +impl VersionedKVStore { + /// Create a new `VersionedKVStore` with the specified directory, name, and buffer size. + /// + /// The journal file will be named `.jrn.N` where N is the generation number. + /// If a journal already exists, it will be loaded with its existing contents. + /// If the specified size is larger than an existing file, it will be resized while preserving + /// data. If the specified size is smaller and the existing data doesn't fit, a fresh journal + /// will be created. + /// + /// # Errors + /// Returns an error if we failed to create or open the journal file. + pub async fn new>( + dir_path: P, + name: &str, + buffer_size: usize, + high_water_mark_ratio: Option, + time_provider: Arc, + ) -> anyhow::Result<(Self, DataLoss)> { + let dir = dir_path.as_ref(); + + let (journal_path, generation) = file_manager::find_active_journal(dir, name).await; + + // TODO(snowp): It would be ideal to be able to start with a small buffer and grow is as needed + // depending on the particular device need. We can embed size information in the journal header + // or in the filename itself to facilitate this. + + log::debug!( + "Opening VersionedKVStore journal at {} (generation {generation})", + journal_path.display() + ); + + let (journal, initial_state, data_loss) = if journal_path.exists() { + // Try to open existing journal + Self::open( + &journal_path, + buffer_size, + high_water_mark_ratio, + time_provider.clone(), + ) + .map(|(j, initial_state, data_loss)| (j, initial_state, data_loss.into())) + .or_else(|_| { + // Data is corrupt or unreadable, create fresh journal + Ok::<_, anyhow::Error>(( + MemMappedVersionedJournal::new( + &journal_path, + buffer_size, + high_water_mark_ratio, + time_provider, + std::iter::empty(), + )?, + AHashMap::default(), + DataLoss::Total, + )) + })? + } else { + // Create new journal + ( + MemMappedVersionedJournal::new( + &journal_path, + buffer_size, + high_water_mark_ratio, + time_provider, + std::iter::empty(), + )?, + AHashMap::default(), + DataLoss::None, + ) + }; + + Ok(( + Self { + journal, + cached_map: initial_state, + dir_path: dir.to_path_buf(), + journal_name: name.to_string(), + buffer_size, + high_water_mark_ratio, + current_generation: generation, + }, + data_loss, + )) + } + + /// Open an existing `VersionedKVStore` from a pre-existing journal file. + /// + /// Unlike `new()`, this method requires the journal file to exist and will fail if it's + /// missing. + /// + /// # Arguments + /// * `dir_path` - Directory path where the journal is stored + /// * `name` - Base name of the journal (e.g., "store" for "store.jrn.N") + /// * `buffer_size` - Size in bytes for the journal buffer + /// * `high_water_mark_ratio` - Optional ratio (0.0 to 1.0) for high water mark. Default: 0.8 + /// + /// # Errors + /// Returns an error if: + /// - The journal file does not exist + /// - The journal file cannot be opened + /// - The journal file contains invalid data + /// - Initialization fails + pub async fn open_existing>( + dir_path: P, + name: &str, + buffer_size: usize, + high_water_mark_ratio: Option, + time_provider: Arc, + ) -> anyhow::Result<(Self, DataLoss)> { + let dir = dir_path.as_ref(); + + let (journal_path, generation) = file_manager::find_active_journal(dir, name).await; + + let (journal, initial_state, data_loss) = Self::open( + &journal_path, + buffer_size, + high_water_mark_ratio, + time_provider, + )?; + + Ok(( + Self { + journal, + cached_map: initial_state, + dir_path: dir.to_path_buf(), + journal_name: name.to_string(), + buffer_size, + high_water_mark_ratio, + current_generation: generation, + }, + if matches!(data_loss, PartialDataLoss::Yes) { + DataLoss::Partial + } else { + DataLoss::None + }, + )) + } + + fn open( + journal_path: &Path, + buffer_size: usize, + high_water_mark_ratio: Option, + time_provider: Arc, + ) -> anyhow::Result<( + MemMappedVersionedJournal, + AHashMap, + PartialDataLoss, + )> { + let mut initial_state = AHashMap::default(); + let (journal, data_loss) = MemMappedVersionedJournal::::from_file( + journal_path, + buffer_size, + high_water_mark_ratio, + time_provider, + |entry, timestamp| { + if let Some(value) = entry.value.as_ref() { + initial_state.insert( + entry.key.clone(), + TimestampedValue { + value: value.clone(), + timestamp, + }, + ); + } else { + initial_state.remove(&entry.key); + } + }, + )?; + + Ok((journal, initial_state, data_loss)) + } + + /// Get a value by key. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn get(&self, key: &str) -> Option<&StateValue> { + self.cached_map.get(key).map(|tv| &tv.value) + } + + /// Get a value with its timestamp by key. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn get_with_timestamp(&self, key: &str) -> Option<&TimestampedValue> { + self.cached_map.get(key) + } + + /// Insert a value for a key, returning the timestamp assigned to this write. + /// + /// Note: Inserting `Value::Null` is equivalent to removing the key. + /// + /// # Errors + /// Returns an error if the value cannot be written to the journal. + pub async fn insert(&mut self, key: String, value: StateValue) -> anyhow::Result { + let timestamp = if value.value_type.is_none() { + // Inserting null is equivalent to deletion + let timestamp = self.journal.insert_entry(StateKeyValuePair { + key: key.clone(), + ..Default::default() + })?; + self.cached_map.remove(&key); + timestamp + } else { + let timestamp = self.journal.insert_entry(StateKeyValuePair { + key: key.clone(), + value: Some(value.clone()).into(), + ..Default::default() + })?; + self + .cached_map + .insert(key, TimestampedValue { value, timestamp }); + timestamp + }; + + // Check if rotation is needed + if self.journal.is_high_water_mark_triggered() { + // TODO(snowp): Consider doing this out of band to split error handling for the insert and + // rotation. + self.rotate_journal().await?; + } + + Ok(timestamp) + } + + /// Remove a key and return the timestamp assigned to this deletion. + /// + /// Returns `None` if the key didn't exist, otherwise returns the timestamp. + /// + /// # Errors + /// Returns an error if the deletion cannot be written to the journal. + pub async fn remove(&mut self, key: &str) -> anyhow::Result> { + if !self.cached_map.contains_key(key) { + return Ok(None); + } + + let timestamp = self.journal.insert_entry(StateKeyValuePair { + key: key.to_string(), + ..Default::default() + })?; + self.cached_map.remove(key); + + // Check if rotation is needed + if self.journal.is_high_water_mark_triggered() { + self.rotate_journal().await?; + } + + Ok(Some(timestamp)) + } + + /// Check if the store contains a key. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn contains_key(&self, key: &str) -> bool { + self.cached_map.contains_key(key) + } + + /// Get the number of key-value pairs in the store. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn len(&self) -> usize { + self.cached_map.len() + } + + /// Check if the store is empty. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Get a reference to the current hash map with timestamps. + /// + /// This operation is O(1) as it reads from the in-memory cache. + #[must_use] + pub fn as_hashmap(&self) -> &AHashMap { + &self.cached_map + } + + /// Synchronize changes to disk. + /// + /// This is a blocking operation that performs synchronous I/O. In async contexts, + /// consider wrapping this call with `tokio::task::spawn_blocking`. + pub fn sync(&self) -> anyhow::Result<()> { + MemMappedVersionedJournal::sync(&self.journal) + } +} + + +/// Information about a journal rotation. This is used by test code to verify rotation results. +pub struct Rotation { + pub new_journal_path: PathBuf, + pub old_journal_path: PathBuf, + pub snapshot_path: PathBuf, +} + +impl VersionedKVStore { + /// Manually trigger journal rotation, returning the path to the new journal file. + /// + /// This will create a new journal with the current state compacted and archive the old journal. + /// The archived journal will be compressed using zlib to reduce storage size. + /// Rotation typically happens automatically when the high water mark is reached, but this + /// method allows manual control when needed. + pub async fn rotate_journal(&mut self) -> anyhow::Result { + let next_generation = self.current_generation + 1; + let old_generation = self.current_generation; + self.current_generation = next_generation; + + // TODO(snowp): This part needs fuzzing and more safeguards. + // TODO(snowp): Consider doing this out of band to split error handling for the insert and + // rotation. + + // Create new journal with compacted state. This doens't touch the file containing the old + // journal. + let new_journal_path = self + .dir_path + .join(format!("{}.jrn.{next_generation}", self.journal_name)); + + MemMappedVersionedJournal::sync(&self.journal)?; + let new_journal = self.create_rotated_journal(&new_journal_path)?; + self.journal = new_journal; + + // Best-effort cleanup: compress and archive the old journal + let old_journal_path = self + .dir_path + .join(format!("{}.jrn.{old_generation}", self.journal_name)); + let snapshot_path = self.archive_journal(&old_journal_path).await; + + Ok(Rotation { + new_journal_path, + old_journal_path, + snapshot_path, + }) + } + + /// Archives the old journal by compressing it and removing the original. + /// + /// This is a best-effort operation; failures to compress or delete the old journal + /// are logged but do not cause the rotation to fail. + async fn archive_journal(&self, old_journal_path: &Path) -> PathBuf { + // Generate archived path with timestamp + let rotation_timestamp = self + .cached_map + .values() + .map(|tv| tv.timestamp) + .max() + .unwrap_or(0); + + let archived_path = self.dir_path.join(format!( + "{}.jrn.t{}.zz", + self.journal_name, rotation_timestamp + )); + + log::debug!( + "Archiving journal {} to {}", + old_journal_path.display(), + archived_path.display() + ); + + // Try to compress the old journal for longer-term storage. + if let Err(e) = compress_archived_journal(old_journal_path, &archived_path).await { + log::warn!( + "Failed to compress archived journal {}: {}", + old_journal_path.display(), + e + ); + } + + // Remove the uncompressed regardless of compression success. If we succeeded we no longer need + // it, while if we failed we consider the snapshot lost. + let _ignored = tokio::fs::remove_file(old_journal_path) + .await + .inspect_err(|e| { + log::warn!( + "Failed to remove old journal {}: {}", + old_journal_path.display(), + e + ); + }); + + archived_path + } + + /// Create a new rotated journal with compacted state. + /// + /// Note: Rotation cannot fail due to insufficient buffer space. Since rotation creates a new + /// journal with the same buffer size and compaction only removes redundant updates (old + /// versions of keys), the compacted state is always ≤ the current journal size. If data fits + /// during normal operation, it will always fit during rotation. + fn create_rotated_journal( + &self, + journal_path: &Path, + ) -> anyhow::Result> { + MemMappedVersionedJournal::new( + journal_path, + self.buffer_size, + self.high_water_mark_ratio, + self.journal.time_provider.clone(), + self.cached_map.iter().map(|kv| { + ( + StateKeyValuePair { + key: kv.0.clone(), + value: Some(kv.1.value.clone()).into(), + ..Default::default() + }, + kv.1.timestamp, + ) + }), + ) + } +}