Skip to content

Commit

Permalink
fix: Catch and log errors if conflict messages are not found while me…
Browse files Browse the repository at this point in the history
…rging (#1801)

## Motivation

Catch errors if there is an inconsistency in the DB and log it so we can
get to the bottom of it.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [X] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [X] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [X] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [X] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)



<!-- start pr-codex -->

---

## PR-Codex overview
This PR fixes the issue of inconsistent database entries by handling
exceptions and logging warnings for missing messages in the store.

### Detailed summary
- Renamed `existing_remove` to `maybe_existing_remove` and
`existing_add` to `maybe_existing_add`
- Added handling for missing messages in the store with logging warnings
- Updated error handling for `existingRemove` and `existingAdd` in
TypeScript code

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
adityapk00 committed Mar 7, 2024
1 parent b51c15b commit 6cb4c99
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/nine-pugs-sparkle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Catch exception if inconsistency in DB
36 changes: 18 additions & 18 deletions apps/hubble/src/addon/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,19 @@ pub trait StoreDef: Send + Sync {

// If the existing remove has a lower order than the new message, retrieve the full
// Remove message and delete it as part of the RocksDB transaction
let existing_remove = get_message(
let maybe_existing_remove = get_message(
&db,
message.data.as_ref().unwrap().fid as u32,
self.postfix(),
&utils::vec_to_u8_24(&remove_ts_hash)?,
)?
.ok_or_else(|| HubError {
code: "bad_request.internal_error".to_string(),
message: format!(
"The message for the {:x?} not found",
remove_ts_hash.unwrap()
),
})?;
conflicts.push(existing_remove);
)?;

if maybe_existing_remove.is_some() {
conflicts.push(maybe_existing_remove.unwrap());
} else {
warn!(LOGGER, "Message's ts_hash exists but message not found in store";
o!("remove_ts_hash" => format!("{:x?}", remove_ts_hash.unwrap())));
}
}
}

Expand Down Expand Up @@ -202,18 +201,19 @@ pub trait StoreDef: Send + Sync {

// If the existing add has a lower order than the new message, retrieve the full
// Add message and delete it as part of the RocksDB transaction
let existing_add = get_message(
let maybe_existing_add = get_message(
&db,
message.data.as_ref().unwrap().fid as u32,
self.postfix(),
&utils::vec_to_u8_24(&add_ts_hash)?,
)?
.ok_or_else(|| HubError {
code: "bad_request.internal_error".to_string(),
message: format!("The message for the {:x?} not found", add_ts_hash.unwrap()),
})?;
// println!("existing_add: {:?}", existing_add);
conflicts.push(existing_add);
)?;

if maybe_existing_add.is_none() {
warn!(LOGGER, "Message's ts_hash exists but message not found in store";
o!("add_ts_hash" => format!("{:x?}", add_ts_hash.unwrap())));
} else {
conflicts.push(maybe_existing_add.unwrap());
}
}

// println!("conflicts: {:?}", conflicts);
Expand Down
40 changes: 30 additions & 10 deletions apps/hubble/src/storage/stores/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,24 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
} else {
// If the existing remove has a lower order than the new message, retrieve the full
// TRemove message and delete it as part of the RocksDB transaction
const existingRemove = await getMessage<TRemove>(
this._db,
// biome-ignore lint/style/noNonNullAssertion: legacy code, avoid using ignore for new code
message.data!.fid,
this._postfix,
removeTsHash.value,
const existingRemove = await ResultAsync.fromPromise(
getMessage<TRemove>(
this._db,
// biome-ignore lint/style/noNonNullAssertion: legacy code, avoid using ignore for new code
message.data!.fid,
this._postfix,
removeTsHash.value,
),
(e) => e,
);
conflicts.push(existingRemove);
if (existingRemove.isOk()) {
conflicts.push(existingRemove.value);
} else {
logger.warn(
{ message, tsHash, removeTsHash, error: existingRemove.error },
"Message's ts_hash exists but message not found in store",
);
}
}
}
}
Expand All @@ -515,9 +525,19 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
} else {
// If the existing add has a lower order than the new message, retrieve the full
// TAdd message and delete it as part of the RocksDB transaction
// biome-ignore lint/style/noNonNullAssertion: legacy code, avoid using ignore for new code
const existingAdd = await getMessage<TAdd>(this._db, message.data!.fid, this._postfix, addTsHash.value);
conflicts.push(existingAdd);
const existingAdd = await ResultAsync.fromPromise(
// biome-ignore lint/style/noNonNullAssertion: legacy code, avoid using ignore for new code
getMessage<TAdd>(this._db, message.data!.fid, this._postfix, addTsHash.value),
(e) => e,
);
if (existingAdd.isOk()) {
conflicts.push(existingAdd.value);
} else {
logger.warn(
{ message, tsHash, addTsHash, error: existingAdd.error },
"Message's ts_hash exists but message not found in store",
);
}
}
}

Expand Down

0 comments on commit 6cb4c99

Please sign in to comment.