Skip to content

Commit

Permalink
[fix] Fix race condition with earliest ts hash calculation (#2204)
Browse files Browse the repository at this point in the history
## Why is this change needed?

It was possible to trigger race conditions when calculating
earliest_ts_hash, this led to hubs have different values which meant
that stores would become out of sync. Migrate the cache to rust and
prevent race conditions.


## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR migrates the storage cache to Rust and addresses a race
condition.

### Detailed summary
- Migrated storage cache to Rust for efficiency
- Fixed a race condition in the storage cache
- Added `PRIMARY_KEY_LENGTH` constant in `message.rs`
- Updated `storage_cache` exports in `store/mod.rs`
- Added functions to interact with storage cache in `utils.rs`
- Updated `lib.rs` to include `StorageCache`
- Updated `storeEventHandler.ts` to handle `StorageCache`
- Updated tests to use `StorageCache` in `storeEventHandler.test.ts`
- Implemented `StorageCache` in `store_event_handler.rs`
- Added Rust bindings for `StorageCache` in `rustfunctions.ts`

> The following files were skipped due to too many changes:
`apps/hubble/src/storage/stores/storageCache.ts`,
`apps/hubble/src/storage/stores/storageCache.test.ts`,
`apps/hubble/src/addon/src/store/storage_cache.rs`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sanjayprabhu committed Jul 24, 2024
1 parent be54d20 commit 554d82a
Show file tree
Hide file tree
Showing 13 changed files with 400 additions and 89 deletions.
5 changes: 5 additions & 0 deletions .changeset/violet-poets-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Migrate storage cache to rust and fix race condition
10 changes: 9 additions & 1 deletion apps/hubble/src/addon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
store::{CastStore, StoreEventHandler, UsernameProofStore, VerificationStore},
store::{CastStore, StorageCache, StoreEventHandler, UsernameProofStore, VerificationStore},
trie::merkle_trie::MerkleTrie,
};
use db::RocksDB;
Expand Down Expand Up @@ -100,6 +100,14 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
)?;
cx.export_function("getNextEventId", StoreEventHandler::js_get_next_event_id)?;

cx.export_function("createStorageCache", StorageCache::js_create_storage_cache)?;
cx.export_function("getEarliestTsHash", StorageCache::js_get_earliest_ts_hash)?;
cx.export_function(
"clearEarliestTsHash",
StorageCache::js_clear_earliest_ts_hash,
)?;
cx.export_function("clearEarliestTsHashCache", StorageCache::js_clear_cache)?;

cx.export_function("createDb", RocksDB::js_create_db)?;
cx.export_function("dbOpen", RocksDB::js_open)?;
cx.export_function("dbApproximateSize", RocksDB::js_approximate_size)?;
Expand Down
7 changes: 6 additions & 1 deletion apps/hubble/src/addon/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const FID_BYTES: usize = 4;

pub const TS_HASH_LENGTH: usize = 24;
pub const HASH_LENGTH: usize = 20;
pub const PRIMARY_KEY_LENGTH: usize = 1 + FID_BYTES + 1 + TS_HASH_LENGTH; // Root prefix + fid + set + ts_hash

pub const TRUE_VALUE: u8 = 1;

Expand Down Expand Up @@ -181,6 +182,10 @@ pub fn type_to_set_postfix(message_type: MessageType) -> UserPostfix {
return UserPostfix::LinkMessage;
}

if message_type == MessageType::LinkCompactState {
return UserPostfix::LinkCompactStateMessage;
}

if message_type == MessageType::UsernameProof {
return UserPostfix::UsernameProofMessage;
}
Expand Down Expand Up @@ -243,7 +248,7 @@ pub fn make_message_primary_key(
set: u8,
ts_hash: Option<&[u8; TS_HASH_LENGTH]>,
) -> Vec<u8> {
let mut key = Vec::with_capacity(1 + 4 + 1 + TS_HASH_LENGTH);
let mut key = Vec::with_capacity(1 + FID_BYTES + 1 + TS_HASH_LENGTH);
key.extend_from_slice(&make_user_key(fid));
key.push(set);
if ts_hash.is_some() {
Expand Down
2 changes: 2 additions & 0 deletions apps/hubble/src/addon/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub use self::cast_store::*;
pub use self::link_store::*;
pub use self::message::*;
pub use self::reaction_store::*;
pub use self::storage_cache::*;
pub use self::store::*;
pub use self::store_event_handler::*;
pub use self::user_data_store::*;
Expand All @@ -14,6 +15,7 @@ mod link_store;
mod message;
mod name_registry_events;
mod reaction_store;
mod storage_cache;
mod store;
mod store_event_handler;
mod user_data_store;
Expand Down
255 changes: 255 additions & 0 deletions apps/hubble/src/addon/src/store/storage_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
use crate::db::RocksDB;
use crate::THREAD_POOL;
use neon::{
context::{Context, FunctionContext},
result::JsResult,
types::{buffer::TypedArray, JsBox, JsNumber, JsPromise},
};

use crate::protos::{hub_event, HubEvent, Message, MessageType};
use crate::store::{
bytes_compare, get_storage_cache, hub_error_to_js_throw, make_message_primary_key,
make_ts_hash, type_to_set_postfix, HubError, PageOptions, FID_BYTES, PRIMARY_KEY_LENGTH,
};
use neon::prelude::Finalize;
use neon::types::JsBoolean;
use std::clone::Clone;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock};

pub const PENDING_SCANS_LOCKS_COUNT: usize = 5;

pub struct StorageCache {
db: Arc<RocksDB>,
earliest_ts_hashes: Arc<RwLock<HashMap<Vec<u8>, Option<Vec<u8>>>>>, // We're not using a RwLock here because this is very read heavy
fid_locks: Arc<[Mutex<()>; PENDING_SCANS_LOCKS_COUNT]>,
}

// Needed to let the StorageCache be owned by the JS runtime
impl Finalize for StorageCache {}

impl StorageCache {
pub fn new(db: Arc<RocksDB>) -> Self {
Self {
db,
earliest_ts_hashes: Arc::new(RwLock::new(HashMap::new())),
fid_locks: Arc::new([
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
]),
}
}

pub fn get_earliest_ts_hash(&self, fid: u32, set: u8) -> Result<Option<Vec<u8>>, HubError> {
let prefix = make_message_primary_key(fid, set, None);

let existing_value = self.get_earliest_ts_hash_from_cache(&prefix);
return if let Some(value) = existing_value {
// We have a cached value
if let Some(value) = value {
Ok(Some(value.clone()))
} else {
Ok(None)
}
} else {
// Nothing in the cache, read from db, locking on the fid to prevent too many concurrent db scans
let _lock = self.fid_locks[fid as usize % PENDING_SCANS_LOCKS_COUNT]
.lock()
.unwrap();
let ts_hash_result = self.get_earliest_ts_hash_from_db(&prefix);
if ts_hash_result.is_ok() {
let earliest_ts_hash = ts_hash_result.unwrap();
self.set_earliest_ts_hash(fid, set, &earliest_ts_hash);
Ok(earliest_ts_hash)
} else {
Err(ts_hash_result.unwrap_err())
}
};
}

pub fn process_event(&self, event: &mut HubEvent) {
// Based on the contents of event body, add or remove messages
match &event.body {
Some(body) => match body {
hub_event::Body::MergeMessageBody(merge_message_body) => {
if merge_message_body.message.is_some() {
self.add_message(merge_message_body.message.as_ref().unwrap());
}
for deleted_message in merge_message_body.deleted_messages.iter() {
self.remove_message(deleted_message);
}
}
hub_event::Body::RevokeMessageBody(delete_message_body) => {
if delete_message_body.message.is_some() {
self.remove_message(delete_message_body.message.as_ref().unwrap());
}
}
hub_event::Body::PruneMessageBody(prune_message_body) => {
if prune_message_body.message.is_some() {
self.remove_message(prune_message_body.message.as_ref().unwrap());
}
}
_ => {}
},
None => {}
}
}

fn add_message(&self, message: &Message) {
let fid = message.data.as_ref().unwrap().fid as u32;
let set = type_to_set_postfix(
MessageType::try_from(message.data.as_ref().unwrap().r#type).unwrap(),
) as u8;

let current_earliest_result = self.get_earliest_ts_hash(fid, set);
match current_earliest_result {
Ok(result) => match result {
Some(earliest_ts_hash) => {
let message_ts_hash =
make_ts_hash(message.data.as_ref().unwrap().timestamp, &message.hash);
match message_ts_hash {
Ok(ts_hash) => {
if bytes_compare(earliest_ts_hash.deref(), &ts_hash) > 0 {
self.set_earliest_ts_hash(fid, set, &Some(ts_hash.to_vec()));
}
}
Err(_) => {}
}
}
None => {
let message_ts_hash =
make_ts_hash(message.data.as_ref().unwrap().timestamp, &message.hash);
if message_ts_hash.is_ok() {
let ts_hash = message_ts_hash.unwrap();
self.set_earliest_ts_hash(fid, set, &Some(ts_hash.to_vec()));
}
}
},
_ => {}
}
}

fn remove_message(&self, message: &Message) {
let fid = message.data.as_ref().unwrap().fid as u32;
let set = type_to_set_postfix(
MessageType::try_from(message.data.as_ref().unwrap().r#type).unwrap(),
) as u8;

let current_earliest_result = self.get_earliest_ts_hash(fid, set);
match current_earliest_result {
Ok(result) => match result {
Some(earliest_ts_hash) => {
let message_ts_hash =
make_ts_hash(message.data.as_ref().unwrap().timestamp, &message.hash);
match message_ts_hash {
Ok(ts_hash) => {
if bytes_compare(earliest_ts_hash.deref(), &ts_hash) < 1 {
self.clear_earliest_ts_hash(fid, set);
}
}
Err(_) => {}
}
}
None => {}
},
_ => {}
}
}

fn clear_earliest_ts_hash(&self, fid: u32, set: u8) {
let prefix = make_message_primary_key(fid, set as u8, None);
self.earliest_ts_hashes.write().unwrap().remove(&prefix);
}

fn set_earliest_ts_hash(&self, fid: u32, set: u8, value: &Option<Vec<u8>>) {
let prefix = make_message_primary_key(fid, set as u8, None);
self.earliest_ts_hashes
.write()
.unwrap()
.insert(prefix, value.clone());
}

fn clear_cache(&self) {
self.earliest_ts_hashes.write().unwrap().clear();
}

fn get_earliest_ts_hash_from_db(&self, prefix: &[u8]) -> Result<Option<Vec<u8>>, HubError> {
let mut earliest_ts_hash: Option<Vec<u8>> = None;
let res =
self.db
.for_each_iterator_by_prefix(prefix, &PageOptions::default(), |key, _value| {
if key.len() == PRIMARY_KEY_LENGTH {
let ts_hash = key.to_vec()[1 + FID_BYTES + 1..].to_vec();
earliest_ts_hash = Some(ts_hash);
}
// Finish the iteration, we only care about the first message
return Ok(true);
});

if res.is_err() {
return Err(res.unwrap_err());
}
return Ok(earliest_ts_hash);
}

fn get_earliest_ts_hash_from_cache(&self, prefix: &Vec<u8>) -> Option<Option<Vec<u8>>> {
let lock = self.earliest_ts_hashes.read();
let guard = lock.unwrap();
return guard.get(prefix).cloned();
}
}

impl StorageCache {
pub fn js_create_storage_cache(mut cx: FunctionContext) -> JsResult<JsBox<Arc<StorageCache>>> {
let db = cx.argument::<JsBox<Arc<RocksDB>>>(0)?;
return Ok(cx.boxed(Arc::new(StorageCache::new((**db).clone()))));
}

pub fn js_get_earliest_ts_hash(mut cx: FunctionContext) -> JsResult<JsPromise> {
let storage_cache = get_storage_cache(&mut cx)?;

let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let set = cx.argument::<JsNumber>(1).unwrap().value(&mut cx) as u8;

let channel = cx.channel();
let (deferred, promise) = cx.promise();

THREAD_POOL.lock().unwrap().execute(move || {
let result = storage_cache.get_earliest_ts_hash(fid, set);

deferred.settle_with(&channel, move |mut cx| match result {
Ok(Some(r)) => {
let mut js_buffer = cx.buffer(r.len())?;
js_buffer.as_mut_slice(&mut cx).copy_from_slice(&r);
Ok(js_buffer)
}
Ok(None) => Ok(cx.buffer(0)?.into()),
Err(e) => hub_error_to_js_throw(&mut cx, e),
});
});
Ok(promise)
}

pub fn js_clear_earliest_ts_hash(mut cx: FunctionContext) -> JsResult<JsBoolean> {
let storage_cache = get_storage_cache(&mut cx)?;

let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let set = cx.argument::<JsNumber>(1).unwrap().value(&mut cx) as u8;

storage_cache.clear_earliest_ts_hash(fid, set);

Ok(cx.boolean(true))
}

pub fn js_clear_cache(mut cx: FunctionContext) -> JsResult<JsBoolean> {
let storage_cache = get_storage_cache(&mut cx)?;

storage_cache.clear_cache();

Ok(cx.boolean(true))
}
}
2 changes: 1 addition & 1 deletion apps/hubble/src/addon/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ pub trait StoreDef: Send + Sync {
pub struct Store {
store_def: Box<dyn StoreDef>,
store_event_handler: Arc<StoreEventHandler>,
fid_locks: Arc<[Mutex<()>; 4]>,
fid_locks: Arc<[Mutex<()>; FID_LOCKS_COUNT]>,
db: Arc<RocksDB>,
logger: slog::Logger,
}
Expand Down
Loading

0 comments on commit 554d82a

Please sign in to comment.