-
Notifications
You must be signed in to change notification settings - Fork 0
bd-resilient-kv: add versioned store #307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adds a new variant of the resilient-kv that supports automatically rotating versioned entries
Add SnapshotCleanup utility to manage disk space by removing old archived journal snapshots based on version thresholds or retention counts. Supports two cleanup strategies: version-based (keep snapshots >= min version) and count-based (keep N most recent). Includes comprehensive test coverage with 15 tests covering various scenarios including empty directories, edge cases, and multi-journal isolation.
Add test that documents and verifies the impossibility of buffer overflow during rotation. The test demonstrates that rotation always succeeds because compacted state fits in the same-sized buffer used during normal operation.
…unique keys" This reverts commit 96394a2.
Add explicit documentation about failure modes applications must handle versus those that are architecturally impossible. Clarifies that rotation cannot fail due to buffer overflow since compacted state always fits in same-sized buffer.
| arc-swap = "1.7.1" | ||
| assert_matches = "1.5.0" | ||
| assert_no_alloc = "1.1.2" | ||
| async-compression = { version = "0.4.20", features = ["tokio", "zlib"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure this does the right thing with flate2 in terms of using system zlib otherwise maybe better to do it manually.
bd-resilient-kv/AGENTS.md
Outdated
| - Automatic rotation when journal size exceeds high water mark (triggered during async write operations) | ||
| - Current state is compacted into a new journal as versioned entries | ||
| - Old journal is archived with `.v{version}.zz` suffix | ||
| - Archived journals are automatically compressed using zlib (RFC 1950, level 3) asynchronously |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we use level 5 in the rest of the mobile code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to keep it consistent, I'll update
| // Write all current state as versioned entries at the snapshot version | ||
| // Use the original timestamp from each entry to preserve historical accuracy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct. I think we need to the entries to have the original versions, and for each snapshot to have increasing versions. Reset would only happen on full clear I think. This allows us to replay the the full sequence against the ring buffer and get accurate state for every log. As long as the version field is u64 I don't think rollover can happen in practice.
Or maybe I'm misunderstanding how this would be used? Basically my thinking was we would just keep making files and then clean them up as it rolls over. Compression of the previous file is orthogonal.
Per offline convo it would be nice to have the encoder use varints for int fields. I don't think that should be all that difficult.
| /// # Rotation Strategy | ||
| /// When the journal reaches its high water mark, the store automatically: | ||
| /// 1. Creates a new journal file with a rotated name (e.g., `store.jrn.v12345`) | ||
| /// 2. Writes the current state as versioned entries at the rotation version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to my other comment, but I'm not sure this is the way it should work. I think in the new world we basically never compact and just delete old journals, with optional compression. If we pass through the hashmap cache, it stays consistent. Then on startup the way you reconstruct state is you read all the files and process them all sequentially. I'm not sure the benefit of compacting the state into the new journal.
| max_version: u64, | ||
| } | ||
|
|
||
| impl VersionedRecovery { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this code was generated to do basically what I said about restart of reading all files, so wondering what the compaction is needed for.
mattklein123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM at a high level. We should add a fuzzer per discussion but we would do this in a different PR if you want.
bd-resilient-kv/VERSIONED_FORMAT.md
Outdated
| │ VERSIONED JOURNAL ENTRY │ | ||
| │ (Protobuf-encoded StateKeyValuePair) │ | ||
| ├─────────────────────────────────────────────────────────────────────────┤ | ||
| │ Frame Length (u32) │ 4 bytes │ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have done this with the ring buffer but since you are already in here I would do varint encoding for this as well since it will generally be pretty small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this to varint
| pub fn make_string_value(s: &str) -> state::payload::StateValue { | ||
| state::payload::StateValue { | ||
| value_type: Some(state::payload::state_value::Value_type::StringValue( | ||
| s.to_string(), | ||
| )), | ||
| ..Default::default() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want to go down the path of custom encoder for all the things but this could be trivially zero copy also. Maybe next week we could try hacking on the proc macro. (Definitely a TODO)
| let pattern = format!("{name}.jrn."); | ||
|
|
||
| let mut max_gen = 0u64; | ||
| let Ok(entries) = std::fs::read_dir(dir) else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async?
| #[cfg(test)] | ||
| #[path = "./framing_test.rs"] | ||
| mod tests; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to top of file / update agents.md about this.
| /// Insert a value for a key, returning the timestamp assigned to this write. | ||
| /// | ||
| /// Note: Inserting `Value::Null` is equivalent to removing the key. | ||
| /// | ||
| /// # Errors | ||
| /// Returns an error if the value cannot be written to the journal. | ||
| pub async fn insert(&mut self, key: String, value: StateValue) -> anyhow::Result<u64> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice I wonder if we are going to need to pass in the timstamp, so for example it can line up with logs. We can always change this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can see how it goes when we integrate it with the rest of the system
This adds a versioned key-value store that supports points in time recovery with disk persistence. This will be used to power a new primitive within the SDK known as "state" that will power feature flags, "global" fields as well as other state tracked by the system.
Refer to VERSIONED_FORMAT.md for details about the implementation.
Fixes BIT-6799