refactor DM routing and add live subscription updates#47
Conversation
Made-with: Cursor # Conflicts: # .gitignore # src/ui/key_handler/confirmation.rs # src/ui/key_handler/mod.rs # src/util/dm_utils/mod.rs
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntroduces a DM router and subscription command channel for real‑time trade DM handling, replaces periodic DM polling with an event-driven router, distinguishes maker/taker in order persistence, adds FormField-driven UI navigation and YES/NO button state, and wires dm_subscription channels through startup, key handlers, and async order flows. Changes
Sequence Diagram(s)sequenceDiagram
participant UI as "UI"
participant Send as "send_new_order / take_order"
participant Router as "DM Router (dm_subscription)"
participant Mostro as "Mostro Relay"
participant BG as "Background listener"
participant DB as "Database"
UI->>Send: user confirms (Enter YES)
Send->>Router: RegisterWaiter / TrackOrder (trade_keys / order_id)
Router-->>Send: ack register
Send->>Mostro: send DM to Mostro relay
Mostro-->>Router: GiftWrap event (relay)
Router->>BG: dispatch event to waiter(s) or tracked order handler
BG->>DB: upsert/update order status
DB-->>BG: persist result
BG->>UI: emit notification or stash pending_post_take_operation_result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (9)
tests/db_tests.rs (1)
174-183: Tests updated correctly. Consider adding taker coverage.The signature updates are correct. All tests currently use
is_maker: true. Consider adding a test case withis_maker: falseto verify taker order storage behavior works as expected.,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/db_tests.rs` around lines 174 - 183, Add a complementary test that exercises the taker path by calling Order::new with is_maker: false (e.g., create a new variable like order_taker using the same small_order or a clone with modified fields), await and unwrap the result, then assert the expected taker-specific storage/behavior (matching IDs, amounts, and any taker-specific columns) just as the existing maker assertions do; ensure you use the same helper variables (small_order, trade_keys, pool) and change only the is_maker boolean to false so the test verifies taker order storage and retrieval.debug-notes.md (1)
1-62: Consider removing debug notes before merging to main.This file contains internal debugging handoff notes specific to a development investigation. While valuable during development, it should likely not be committed to the main branch as it:
- References a specific debug branch (
fix-windows-launch) that may not match the PR context- Contains ephemeral debugging instructions ("What to test next (first thing tomorrow)")
- Documents internal implementation investigation rather than user-facing or architectural documentation
Consider either removing this file before merge, or converting actionable insights into proper documentation (e.g., a troubleshooting section in docs, or comments in the relevant source files).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@debug-notes.md` around lines 1 - 62, This debug-only markdown (debug-notes.md) should not be merged as-is; either delete the file from the PR or move the useful, non-ephemeral parts into a proper place (e.g., an internal troubleshooting doc or an issue) and remove the step-by-step debug checklist and branch-specific references; specifically, remove the transient "What to test next" and branch name mentions or convert them into durable docs, then update the PR to remove debug-notes.md or replace it with a concise, sanitized troubleshooting entry.src/util/order_utils/take_order.rs (1)
143-183: Consider documenting the dual TrackOrder sends.The code sends
TrackOrdertwice: once early (lines 75-87) and once after DB save (lines 172-182). While this redundancy is likely intentional for robustness, a brief comment explaining why both are needed would help future maintainers understand this isn't accidental duplication.📝 Suggested comment
if let Some(tx) = dm_subscription_tx { + // Re-send TrackOrder after DB save to ensure subscription is active + // even if the early send (before Mostro message) arrived before + // router initialization completed. log::info!( "[take_order] Sending DM subscription command for order_id={}, trade_index={}",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/take_order.rs` around lines 143 - 183, The code currently sends OrderDmSubscriptionCmd::TrackOrder twice via dm_subscription_tx (once earlier and again after save_order) which looks like duplication; add a short clarifying comment next to the first dm_subscription_tx.send call and the second send (the block that logs "[take_order] Sending DM subscription command..." and the earlier send block) stating that the first send is an optimistic/early tracking signal and the second is a post-persistence confirmation (to ensure DM tracking even if save fails or to reconcile state after DB persistence), and reference the surrounding functions/values (TrackOrder, dm_subscription_tx, save_order, normalized_order, create_order_result_success) so future maintainers understand this intentional redundancy.src/util/order_utils/fetch_scheduler.rs (3)
69-90: Same mutexunwrap()concern applies here.Line 72 uses
.unwrap()on the mutex lock. Apply the same defensive handling as suggested forapply_live_order_update.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/fetch_scheduler.rs` around lines 69 - 90, In apply_live_dispute_update, avoid directly calling .unwrap() on the Mutex lock for disputes; instead handle a poisoned lock like in apply_live_order_update by matching the result of disputes.lock() (or using .lock().map_err/unwrap_or_else) to either acquire the guard or recover/log and return early if poisoning occurs, then proceed to find/update/push the Dispute and sort; ensure you reference the disputes: Arc<Mutex<Vec<Dispute>>> parameter, the local disputes_lock guard, and keep the existing log::debug call after a successful update.
34-67: Consider handling poisoned mutex instead ofunwrap().Line 38 uses
.unwrap()on the mutex lock, which will panic if the mutex is poisoned (e.g., a previous holder panicked). While mutex poisoning is rare, a panic here would crash the entire order update loop.Consider using
.lock().ok()with an early return or logging, consistent with how lock failures are handled elsewhere in this file (e.g., lines 151-160).♻️ Suggested fix
fn apply_live_order_update(orders: &Arc<Mutex<Vec<SmallOrder>>>, order: SmallOrder) { let Some(order_id) = order.id else { return; }; - let mut orders_lock = orders.lock().unwrap(); + let mut orders_lock = match orders.lock() { + Ok(guard) => guard, + Err(e) => { + log::warn!("[orders_live] Failed to lock orders: {}", e); + return; + } + }; if order.status != Some(Status::Pending) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/fetch_scheduler.rs` around lines 34 - 67, The mutex lock in apply_live_order_update currently uses orders.lock().unwrap(), which can panic on a poisoned mutex; change it to handle the PoisonError by replacing the unwrap with a non-panicking pattern (e.g., match or .lock().ok() / .map_err()) that logs the error and returns early on failure; locate apply_live_order_update and replace the unwrap call with the same poisoned-mutex handling pattern used elsewhere in this file (log the poisoning context and return) so the order-update loop does not crash on a poisoned mutex.
200-215: Reconciliation loops also useunwrap()on mutex locks.Lines 208 and 299 use
.unwrap()on mutex locks inside the reconciliation tick handlers. While the outer lock acquisition at lines 189-198 and 286-295 properly handles errors withcontinue, these inner locks could still panic.Consider applying consistent error handling for all mutex acquisitions.
Also applies to: 296-306
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/fetch_scheduler.rs` around lines 200 - 215, The code currently calls orders_clone.lock().unwrap() inside the reconciliation handlers (e.g., where get_orders is awaited and the subsequent orders_lock.clear()/extend()/len() are used), which can panic on a poisoned mutex; replace those unwrap() calls with proper error handling (e.g., match or if let Ok(mut orders_lock) = orders_clone.lock() { ... } else { log::warn/error!("failed to acquire orders mutex: {:?}", err); continue; }) so the handler skips this tick on lock acquisition failure instead of panicking; apply the same change to the other reconciliation site that also uses orders_clone.lock().unwrap() (lines referenced in the review).src/util/dm_utils/mod.rs (3)
711-722: Stale waiters may linger until the next GiftWrap event.When a waiter's oneshot receiver times out (in
wait_for_dmat line 171-174), the correspondingPendingDmWaiterremains inpending_waitersuntil the next GiftWrap event triggers the drain logic. This isn't a memory leak since eventually all waiters will be checked and removed, but it could be cleaner.Consider periodically pruning waiters whose
response_tx.is_closed()returns true, or accepting this minor inefficiency given the expected low waiter count.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 711 - 722, The pending_waiters vector can retain entries whose oneshot receivers have timed out; update the GiftWrap handling in the block around pending_waiters (where PendingDmWaiter is used and nip59::extract_rumor(...) is called) to prune closed waiters: before attempting extract_rumor, filter or drain out any waiter where waiter.response_tx.is_closed() (removing them without processing), then proceed to try matching remaining waiters and rebuild pending_waiters from non-matching ones; this ensures waiters removed by wait_for_dm timeouts are dropped promptly rather than waiting for the next event.
142-179: Unused_clientparameter inwait_for_dm.The
_clientparameter (line 145) is now unused since the function delegates to the DM router instead of subscribing directly. Consider removing it if the public API can be changed, or document why it's retained for compatibility.
528-547: Fallback resolver iterates all active orders.
resolve_order_for_eventtries to decrypt the event with each active order's trade keys. This is O(n) decryption attempts per unmatched event. For a typical user with few concurrent orders, this is acceptable. If order counts grow significantly, consider caching pubkey-to-order mappings.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 528 - 547, resolve_order_for_event currently tries every active order (active_order_trade_indices) and calls user.derive_trade_keys + nip59::extract_rumor for each, causing O(n) decrypt attempts; replace this fallback with a cache mapping the trade pubkey to its order metadata so you can attempt decryption only for matching keys. Implement a shared cache (e.g., HashMap<PublicKey, (Uuid, i64, Keys)> or similar) that is updated when orders are added/removed, look up the event's sender/recipient pubkey first and only call nip59::extract_rumor for the matched entry, and fall back to the original loop only if the cache lookup misses; update resolve_order_for_event to consult that cache instead of blindly iterating active_order_trade_indices and keep existing variable names (resolve_order_for_event, active_order_trade_indices, user.derive_trade_keys, nip59::extract_rumor) to locate where to integrate the cache.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ui/key_handler/form_input.rs`:
- Around line 32-42: The match on form.focused can panic when
FormField::FiatAmountMax is focused but form.use_range is false; change the
FiatAmountMax arm in the target-selection match in form_input.rs to handle both
cases (e.g. FormField::FiatAmountMax => if form.use_range { &mut
form.fiat_amount_max } else { &mut form.fiat_amount }) so it never falls through
to unreachable!, and apply the same conditional pattern to the corresponding
match in handle_backspace (use the same FormField::FiatAmountMax conditional
mapping to avoid the panic).
In `@src/ui/tabs/orders_tab.rs`:
- Around line 117-125: The code coerces missing timestamps via
order.created_at.unwrap_or(0) so DateTime::from_timestamp(0, 0) yields the Unix
epoch instead of showing the "Invalid date" fallback; update the chain to
propagate None by using and_then on order.created_at and only call
DateTime::from_timestamp when Some(value) (e.g., change the creation of date to
something like order.created_at.and_then(|ts| DateTime::from_timestamp_opt(ts,
0) or equivalent) and then map/with_timezone/format into the Cell::from closure
used for date_cell) so missing created_at results in the unwrap_or_else("Invalid
date") path.
---
Nitpick comments:
In `@debug-notes.md`:
- Around line 1-62: This debug-only markdown (debug-notes.md) should not be
merged as-is; either delete the file from the PR or move the useful,
non-ephemeral parts into a proper place (e.g., an internal troubleshooting doc
or an issue) and remove the step-by-step debug checklist and branch-specific
references; specifically, remove the transient "What to test next" and branch
name mentions or convert them into durable docs, then update the PR to remove
debug-notes.md or replace it with a concise, sanitized troubleshooting entry.
In `@src/util/dm_utils/mod.rs`:
- Around line 711-722: The pending_waiters vector can retain entries whose
oneshot receivers have timed out; update the GiftWrap handling in the block
around pending_waiters (where PendingDmWaiter is used and
nip59::extract_rumor(...) is called) to prune closed waiters: before attempting
extract_rumor, filter or drain out any waiter where
waiter.response_tx.is_closed() (removing them without processing), then proceed
to try matching remaining waiters and rebuild pending_waiters from non-matching
ones; this ensures waiters removed by wait_for_dm timeouts are dropped promptly
rather than waiting for the next event.
- Around line 528-547: resolve_order_for_event currently tries every active
order (active_order_trade_indices) and calls user.derive_trade_keys +
nip59::extract_rumor for each, causing O(n) decrypt attempts; replace this
fallback with a cache mapping the trade pubkey to its order metadata so you can
attempt decryption only for matching keys. Implement a shared cache (e.g.,
HashMap<PublicKey, (Uuid, i64, Keys)> or similar) that is updated when orders
are added/removed, look up the event's sender/recipient pubkey first and only
call nip59::extract_rumor for the matched entry, and fall back to the original
loop only if the cache lookup misses; update resolve_order_for_event to consult
that cache instead of blindly iterating active_order_trade_indices and keep
existing variable names (resolve_order_for_event, active_order_trade_indices,
user.derive_trade_keys, nip59::extract_rumor) to locate where to integrate the
cache.
In `@src/util/order_utils/fetch_scheduler.rs`:
- Around line 69-90: In apply_live_dispute_update, avoid directly calling
.unwrap() on the Mutex lock for disputes; instead handle a poisoned lock like in
apply_live_order_update by matching the result of disputes.lock() (or using
.lock().map_err/unwrap_or_else) to either acquire the guard or recover/log and
return early if poisoning occurs, then proceed to find/update/push the Dispute
and sort; ensure you reference the disputes: Arc<Mutex<Vec<Dispute>>> parameter,
the local disputes_lock guard, and keep the existing log::debug call after a
successful update.
- Around line 34-67: The mutex lock in apply_live_order_update currently uses
orders.lock().unwrap(), which can panic on a poisoned mutex; change it to handle
the PoisonError by replacing the unwrap with a non-panicking pattern (e.g.,
match or .lock().ok() / .map_err()) that logs the error and returns early on
failure; locate apply_live_order_update and replace the unwrap call with the
same poisoned-mutex handling pattern used elsewhere in this file (log the
poisoning context and return) so the order-update loop does not crash on a
poisoned mutex.
- Around line 200-215: The code currently calls orders_clone.lock().unwrap()
inside the reconciliation handlers (e.g., where get_orders is awaited and the
subsequent orders_lock.clear()/extend()/len() are used), which can panic on a
poisoned mutex; replace those unwrap() calls with proper error handling (e.g.,
match or if let Ok(mut orders_lock) = orders_clone.lock() { ... } else {
log::warn/error!("failed to acquire orders mutex: {:?}", err); continue; }) so
the handler skips this tick on lock acquisition failure instead of panicking;
apply the same change to the other reconciliation site that also uses
orders_clone.lock().unwrap() (lines referenced in the review).
In `@src/util/order_utils/take_order.rs`:
- Around line 143-183: The code currently sends
OrderDmSubscriptionCmd::TrackOrder twice via dm_subscription_tx (once earlier
and again after save_order) which looks like duplication; add a short clarifying
comment next to the first dm_subscription_tx.send call and the second send (the
block that logs "[take_order] Sending DM subscription command..." and the
earlier send block) stating that the first send is an optimistic/early tracking
signal and the second is a post-persistence confirmation (to ensure DM tracking
even if save fails or to reconcile state after DB persistence), and reference
the surrounding functions/values (TrackOrder, dm_subscription_tx, save_order,
normalized_order, create_order_result_success) so future maintainers understand
this intentional redundancy.
In `@tests/db_tests.rs`:
- Around line 174-183: Add a complementary test that exercises the taker path by
calling Order::new with is_maker: false (e.g., create a new variable like
order_taker using the same small_order or a clone with modified fields), await
and unwrap the result, then assert the expected taker-specific storage/behavior
(matching IDs, amounts, and any taker-specific columns) just as the existing
maker assertions do; ensure you use the same helper variables (small_order,
trade_keys, pool) and change only the is_maker boolean to false so the test
verifies taker order storage and retrieval.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b91e051d-fbf9-4a01-8f5d-d35955bae7a8
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (36)
Cargo.tomldebug-notes.mddocs/MESSAGE_FLOW_AND_PROTOCOL.mdsrc/main.rssrc/models.rssrc/ui/app_state.rssrc/ui/draw.rssrc/ui/exit_confirm.rssrc/ui/helpers.rssrc/ui/key_handler/async_tasks.rssrc/ui/key_handler/confirmation.rssrc/ui/key_handler/enter_handlers.rssrc/ui/key_handler/esc_handlers.rssrc/ui/key_handler/form_input.rssrc/ui/key_handler/message_handlers.rssrc/ui/key_handler/mod.rssrc/ui/key_handler/navigation.rssrc/ui/key_handler/user_handlers.rssrc/ui/message_notification.rssrc/ui/operation_result.rssrc/ui/order_confirm.rssrc/ui/order_form.rssrc/ui/orders.rssrc/ui/tabs/orders_tab.rssrc/ui/user_state.rssrc/util/db_utils.rssrc/util/dm_utils/mod.rssrc/util/dm_utils/notifications_ch_mng.rssrc/util/dm_utils/order_ch_mng.rssrc/util/mod.rssrc/util/order_utils/fetch_scheduler.rssrc/util/order_utils/helper.rssrc/util/order_utils/mod.rssrc/util/order_utils/send_new_order.rssrc/util/order_utils/take_order.rstests/db_tests.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/util/dm_utils/mod.rs (2)
46-52: Consider returning a success indicator fromset_dm_router_cmd_tx.If the lock is poisoned, the function logs a warning but returns normally, leaving the caller unaware that the router sender wasn't set. This could cause silent failures downstream when
wait_for_dmattempts to register waiters.Proposed change
-pub fn set_dm_router_cmd_tx(tx: mpsc::UnboundedSender<DmRouterCmd>) { - if let Ok(mut guard) = DM_ROUTER_CMD_TX.lock() { - *guard = Some(tx); - } else { - log::warn!("[dm_listener] Failed to set DM router sender due to poisoned lock"); - } +pub fn set_dm_router_cmd_tx(tx: mpsc::UnboundedSender<DmRouterCmd>) -> bool { + match DM_ROUTER_CMD_TX.lock() { + Ok(mut guard) => { + *guard = Some(tx); + true + } + Err(_) => { + log::warn!("[dm_listener] Failed to set DM router sender due to poisoned lock"); + false + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 46 - 52, The function set_dm_router_cmd_tx currently swallows a poisoned lock and only logs a warning; change its signature to return a Result<(), LockError> or a bool success indicator so callers can react when the sender wasn't set. Inside set_dm_router_cmd_tx, return Ok(()) (or true) when the lock is acquired and the sender is set, and return Err(...) (or false) when DM_ROUTER_CMD_TX.lock() fails/is poisoned after logging the warning; update call sites such as wait_for_dm to check the return value and handle the failure path (e.g., propagate the error or fail registration) so failures to set the router sender are not silent.
527-546: Fallback decryption path has O(n) complexity over active orders.This is acceptable as a fallback for unknown
subscription_ids, but if the number of concurrent active orders grows large, the linear decrypt-attempt loop could impact performance. Consider logging metrics here to monitor if this path is hit frequently in production.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 527 - 546, The fallback linear decrypt loop in resolve_order_for_event (iterating active_order_trade_indices and calling nip59::extract_rumor) can be expensive at scale — instrument it: add a metrics counter (e.g., fallback_decrypt_attempts_total) incremented each time this fallback path is entered, a gauge or histogram for the number of active orders scanned and the duration of the loop, and emit a low-volume trace/warn log with the scanned count and duration when the loop runs (and optionally when a match is found). Place the metric increments and timing around the section that locks and clones active_order_trade_indices and around the for-loop in resolve_order_for_event, using your project’s existing metrics/tracing utilities so you can monitor how often this O(n) path is exercised in production.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/util/dm_utils/dm_helpers.rs`:
- Around line 74-83: The function ensure_order_giftwrap_subscription currently
violates clippy::too_many_arguments (8 args); fix this by grouping related
parameters (e.g., subscribed_pubkeys and subscription_to_order into a
SubscriptionsContext struct, and pubkey/order_id/trade_index into an OrderKey or
OrderContext) and update the function signature to accept the new context
structs instead of the individual params, then adjust call sites accordingly; if
refactoring call sites is too disruptive, alternatively add
#[allow(clippy::too_many_arguments)] directly above the
ensure_order_giftwrap_subscription declaration to suppress the lint temporarily.
---
Nitpick comments:
In `@src/util/dm_utils/mod.rs`:
- Around line 46-52: The function set_dm_router_cmd_tx currently swallows a
poisoned lock and only logs a warning; change its signature to return a
Result<(), LockError> or a bool success indicator so callers can react when the
sender wasn't set. Inside set_dm_router_cmd_tx, return Ok(()) (or true) when the
lock is acquired and the sender is set, and return Err(...) (or false) when
DM_ROUTER_CMD_TX.lock() fails/is poisoned after logging the warning; update call
sites such as wait_for_dm to check the return value and handle the failure path
(e.g., propagate the error or fail registration) so failures to set the router
sender are not silent.
- Around line 527-546: The fallback linear decrypt loop in
resolve_order_for_event (iterating active_order_trade_indices and calling
nip59::extract_rumor) can be expensive at scale — instrument it: add a metrics
counter (e.g., fallback_decrypt_attempts_total) incremented each time this
fallback path is entered, a gauge or histogram for the number of active orders
scanned and the duration of the loop, and emit a low-volume trace/warn log with
the scanned count and duration when the loop runs (and optionally when a match
is found). Place the metric increments and timing around the section that locks
and clones active_order_trade_indices and around the for-loop in
resolve_order_for_event, using your project’s existing metrics/tracing utilities
so you can monitor how often this O(n) path is exercised in production.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3cce05ea-5f04-4ec6-94ec-806bf0178234
📒 Files selected for processing (5)
docs/DATABASE.mddocs/MESSAGE_FLOW_AND_PROTOCOL.mddocs/STARTUP_AND_CONFIG.mdsrc/util/dm_utils/dm_helpers.rssrc/util/dm_utils/mod.rs
✅ Files skipped from review due to trivial changes (2)
- docs/DATABASE.md
- docs/STARTUP_AND_CONFIG.md
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
src/util/order_utils/fetch_scheduler.rs (2)
228-235: Inconsistent mutex error handling:unwrap()vs graceful handling.The reconciliation branch uses
.unwrap()on the mutex lock (lines 228, 319), which will panic if the mutex is poisoned. However,apply_live_order_updateandapply_live_dispute_updategracefully handle poisoned mutexes by logging and returning early.Consider handling the poisoned mutex consistently to avoid potential panics:
Proposed fix
if let Ok(fetched_orders) = get_orders( &client_for_orders, mostro_pubkey_for_orders, Some(Status::Pending), Some(currencies), ) .await { - let mut orders_lock = orders_clone.lock().unwrap(); + let mut orders_lock = match orders_clone.lock() { + Ok(guard) => guard, + Err(e) => { + log::warn!("[orders_reconcile] Failed to lock orders: {}", e); + continue; + } + }; orders_lock.clear(); orders_lock.extend(fetched_orders);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/fetch_scheduler.rs` around lines 228 - 235, The reconciliation branch currently uses orders_clone.lock().unwrap() which will panic on a poisoned mutex; match the pattern used in apply_live_order_update and apply_live_dispute_update by handling the Result from orders_clone.lock() instead of unwrapping: attempt to lock, log an error if Err (including context like "orders_reconcile: failed to lock orders mutex") and return early, otherwise proceed to clear/extend the Vec and log the count; update both places using unwrap() (e.g., where orders_lock is acquired) to use this non-panicking, logged handling.
167-167: Misleading variable name:latest_settingsis actually a snapshot.The variable
latest_settingsis cloned once at task spawn time and never refreshed. Currency filter changes won't take effect until the scheduler restarts (e.g., after a key reload). Consider renaming toinitial_settingsorsettings_snapshotfor clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/order_utils/fetch_scheduler.rs` at line 167, The variable latest_settings is misleading because it's cloned once at task spawn and never refreshed; rename it to settings_snapshot or initial_settings to reflect it's a single snapshot taken at spawn time and update all references (e.g., the cloned binding currently named latest_settings inside the fetch scheduler task/closure) accordingly so callers/readers understand currency filter changes won't be applied until restart; ensure variable rename is applied consistently where latest_settings is used within the scheduler logic (task spawn/closure) to avoid confusion.src/util/dm_utils/mod.rs (3)
881-897: Inconsistent indentation on dispatch_giftwrap_batch call.The call is indented one level deeper than the surrounding code. This appears to be a formatting inconsistency.
Suggested fix
if !parsed_messages.is_empty() { log::info!( "[dm_listener] Routed GiftWrap by active-order key for unknown subscription_id={} to order_id={}, trade_index={}", subscription_id, order_id, trade_index ); - dispatch_giftwrap_batch( + dispatch_giftwrap_batch( parsed_messages, ... - ) - .await; + ) + .await; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 881 - 897, The call to dispatch_giftwrap_batch is indented one level deeper than surrounding code causing inconsistent formatting; adjust the indentation of the entire call (starting at dispatch_giftwrap_batch(...) through .await;) to align with the surrounding block indentation so it matches neighboring statements, referencing the dispatch_giftwrap_batch invocation and its trailing .await and the GiftWrapTerminalPolicy::UntrackedFallback argument to locate the exact call site.
539-557: Mutex.unwrap()may panic on poisoned lock.If another thread panics while holding
active_order_trade_indices, subsequent.unwrap()calls will propagate panics, potentially terminating the listener task. Consider using.lock().expect("context")for clearer panic messages, or handle the poisoned case gracefully if listener continuity is important.Example with expect
- { - let mut indices = active_order_trade_indices.lock().unwrap(); - indices.remove(&order_id); - } + { + let mut indices = active_order_trade_indices + .lock() + .expect("active_order_trade_indices mutex poisoned"); + indices.remove(&order_id); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 539 - 557, The calls to active_order_trade_indices.lock().unwrap() in the match arms (e.g., inside the GiftWrapTerminalPolicy::UntrackedFallback branch and the earlier branch) may panic on a poisoned mutex; replace these .unwrap()s with a safe handling strategy such as .lock().expect("failed to lock active_order_trade_indices in listener") or explicitly handle the PoisonError (e.g., .lock().unwrap_or_else(|poisoned| poisoned.into_inner())) so the listener task fails with a clear message or continues gracefully; update both occurrences where active_order_trade_indices.lock().unwrap() is used to the chosen approach.
296-310: Consider extracting a context struct for the 10 parameters.The function signature has many parameters. A
TradeMessageContextstruct holding the shared references (messages,pending_notifications,message_notification_tx,pool) could reduce parameter count and improve readability.Example context struct
struct DmHandlerContext<'a> { messages: &'a Arc<Mutex<Vec<OrderMessage>>>, pending_notifications: &'a Arc<Mutex<usize>>, message_notification_tx: &'a tokio::sync::mpsc::UnboundedSender<MessageNotification>, pool: &'a sqlx::SqlitePool, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 296 - 310, The handle_trade_dm_for_order function has too many parameters; create a context struct (e.g., TradeMessageContext or DmHandlerContext) that bundles the shared references messages: Arc<Mutex<Vec<OrderMessage>>>, pending_notifications: Arc<Mutex<usize>>, message_notification_tx: tokio::sync::mpsc::UnboundedSender<MessageNotification>, and pool: sqlx::SqlitePool, then modify handle_trade_dm_for_order to take that single context struct plus the remaining specific args (order_id: Uuid, trade_index: i64, message: Message, timestamp: i64, sender: PublicKey, trade_keys: &Keys), and update all call sites to construct/pass the context instance instead of the four separate parameters to improve readability and reduce parameter count.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/util/dm_utils/mod.rs`:
- Around line 753-778: The code currently pushes a PendingDmWaiter to
pending_waiters even when client.subscribe(...) fails, causing a silent timeout;
fix by, inside the Err(e) branch after
subscribed_pubkeys.remove(&waiter_pubkey), immediately signal cancellation to
the caller using the waiter’s response_tx (send an error or cancellation result)
and do NOT push the PendingDmWaiter into pending_waiters; in other words, only
push PendingDmWaiter (trade_keys, response_tx) when client.subscribe(...)
returns Ok, and on Err call response_tx to deliver an immediate error before
continuing.
---
Nitpick comments:
In `@src/util/dm_utils/mod.rs`:
- Around line 881-897: The call to dispatch_giftwrap_batch is indented one level
deeper than surrounding code causing inconsistent formatting; adjust the
indentation of the entire call (starting at dispatch_giftwrap_batch(...) through
.await;) to align with the surrounding block indentation so it matches
neighboring statements, referencing the dispatch_giftwrap_batch invocation and
its trailing .await and the GiftWrapTerminalPolicy::UntrackedFallback argument
to locate the exact call site.
- Around line 539-557: The calls to active_order_trade_indices.lock().unwrap()
in the match arms (e.g., inside the GiftWrapTerminalPolicy::UntrackedFallback
branch and the earlier branch) may panic on a poisoned mutex; replace these
.unwrap()s with a safe handling strategy such as .lock().expect("failed to lock
active_order_trade_indices in listener") or explicitly handle the PoisonError
(e.g., .lock().unwrap_or_else(|poisoned| poisoned.into_inner())) so the listener
task fails with a clear message or continues gracefully; update both occurrences
where active_order_trade_indices.lock().unwrap() is used to the chosen approach.
- Around line 296-310: The handle_trade_dm_for_order function has too many
parameters; create a context struct (e.g., TradeMessageContext or
DmHandlerContext) that bundles the shared references messages:
Arc<Mutex<Vec<OrderMessage>>>, pending_notifications: Arc<Mutex<usize>>,
message_notification_tx:
tokio::sync::mpsc::UnboundedSender<MessageNotification>, and pool:
sqlx::SqlitePool, then modify handle_trade_dm_for_order to take that single
context struct plus the remaining specific args (order_id: Uuid, trade_index:
i64, message: Message, timestamp: i64, sender: PublicKey, trade_keys: &Keys),
and update all call sites to construct/pass the context instance instead of the
four separate parameters to improve readability and reduce parameter count.
In `@src/util/order_utils/fetch_scheduler.rs`:
- Around line 228-235: The reconciliation branch currently uses
orders_clone.lock().unwrap() which will panic on a poisoned mutex; match the
pattern used in apply_live_order_update and apply_live_dispute_update by
handling the Result from orders_clone.lock() instead of unwrapping: attempt to
lock, log an error if Err (including context like "orders_reconcile: failed to
lock orders mutex") and return early, otherwise proceed to clear/extend the Vec
and log the count; update both places using unwrap() (e.g., where orders_lock is
acquired) to use this non-panicking, logged handling.
- Line 167: The variable latest_settings is misleading because it's cloned once
at task spawn and never refreshed; rename it to settings_snapshot or
initial_settings to reflect it's a single snapshot taken at spawn time and
update all references (e.g., the cloned binding currently named latest_settings
inside the fetch scheduler task/closure) accordingly so callers/readers
understand currency filter changes won't be applied until restart; ensure
variable rename is applied consistently where latest_settings is used within the
scheduler logic (task spawn/closure) to avoid confusion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e300e333-971e-43de-b172-6a1fb89b47f5
📒 Files selected for processing (13)
docs/STARTUP_AND_CONFIG.mdsrc/main.rssrc/ui/key_handler/async_tasks.rssrc/ui/key_handler/form_input.rssrc/ui/tabs/orders_tab.rssrc/util/dm_utils/dm_helpers.rssrc/util/dm_utils/mod.rssrc/util/order_utils/execute_add_invoice.rssrc/util/order_utils/execute_send_msg.rssrc/util/order_utils/execute_take_dispute.rssrc/util/order_utils/fetch_scheduler.rssrc/util/order_utils/send_new_order.rssrc/util/order_utils/take_order.rs
✅ Files skipped from review due to trivial changes (1)
- docs/STARTUP_AND_CONFIG.md
🚧 Files skipped from review as they are similar to previous changes (2)
- src/ui/key_handler/form_input.rs
- src/util/order_utils/send_new_order.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/util/dm_utils/mod.rs (1)
547-548: Consider loggingunsubscribefailures.The
client.unsubscribe()call ignores any errors. While this is acceptable for best-effort cleanup, a debug/trace log on failure would aid troubleshooting relay connection issues.Suggested enhancement
- client.unsubscribe(subscription_id).await; + if let Err(e) = client.unsubscribe(subscription_id).await { + log::debug!( + "[dm_listener] Failed to unsubscribe {}: {}", + subscription_id, + e + ); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/util/dm_utils/mod.rs` around lines 547 - 548, The call to client.unsubscribe(subscription_id).await currently ignores any error; update the surrounding async block (where client.unsubscribe and subscription_id are used) to handle the Result by matching or using .await.map_err and log failures at debug/trace level (e.g., process_logger.debug/trace or the local logger) when unsubscribe returns Err, while preserving the current best-effort behavior on success; reference the client.unsubscribe(subscription_id).await invocation and the subscription_id variable to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/util/dm_utils/mod.rs`:
- Around line 547-548: The call to client.unsubscribe(subscription_id).await
currently ignores any error; update the surrounding async block (where
client.unsubscribe and subscription_id are used) to handle the Result by
matching or using .await.map_err and log failures at debug/trace level (e.g.,
process_logger.debug/trace or the local logger) when unsubscribe returns Err,
while preserving the current best-effort behavior on success; reference the
client.unsubscribe(subscription_id).await invocation and the subscription_id
variable to locate the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0d4c68cf-e3d8-4b6d-a09f-60d1237ab7e5
📒 Files selected for processing (1)
src/util/dm_utils/mod.rs
…improved messages fetch avoiding double nip59 extraction
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/util/dm_utils/mod.rs`:
- Around line 851-856: Remove the trailing whitespace on the blank line before
the `if can_decrypt` statement to satisfy CI formatting; locate the block that
computes `ok` using `nip59::extract_rumor(&waiter.trade_keys,
&event).await.is_ok()`, stores it with `rumor_cache.insert(key, ok)`, and then
checks `if can_decrypt`, and simply delete the trailing spaces on the empty line
so the file has no trailing whitespace there.
- Around line 894-899: Remove the trailing whitespace on the blank line between
the block that sets `ok` (which calls `nip59::extract_rumor(&trade_keys,
&event).await.is_ok()` and `rumor_cache.insert(key, ok)`) and the `if
!can_decrypt` check; ensure there are no spaces or tabs on that empty line so CI
no longer flags trailing whitespace (you can locate this around the use of
`can_decrypt`, `key`, `event`, and `trade_keys`).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 014d78bf-7e8d-47c1-87f0-d224b411affc
📒 Files selected for processing (3)
.gitignoresrc/util/dm_utils/dm_helpers.rssrc/util/dm_utils/mod.rs
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
- src/util/dm_utils/dm_helpers.rs
There was a problem hiding this comment.
These .lock().unwrap() calls (lines 540, 552, 680, 732, 392, 458) would crash the DM router if a mutex gets poisoned. Should these follow the same match Ok/Err + log::warn! pattern used in apply_live_order_update / apply_live_dispute_update in fetch_scheduler.rs?
There was a problem hiding this comment.
yep! Got your point @AndreaDiazCorreia , from a pure logic i agree,but it's quite normal in rust world to just get a lock on mutex with an unwrap. If the lmutex is poisoned you are yet in baaad situation so panic is not a bad thing.
An example.of discussion about this in this example:
https://users.rust-lang.org/t/should-i-unwrap-a-mutex-lock/61519/2
I added the poison check for an AI automatic suggestion, but in the community is totally ok to unwrap for a mutex lock.
There was a problem hiding this comment.
The comment on line 205 says "Reload currency filters from settings on each fetch", but latest_settings is cloned once at task spawn (line 167). The previous code called load_settings_from_disk() on each tick. Is this intentional, or should the reconciliation branch still reload from disk to pick up runtime filter changes?
There was a problem hiding this comment.
Fixed, now the comment is clearer. When we launch apply_pending_key_reload settings are read back from file to update them, but the comment could be misleading. Changed var name to reloaded_settings.
// Read currency filters from the settings snapshot (`reloaded_settings`) each fetch.
// Note: this does not reload from disk; settings are refreshed when the
// scheduler tasks are respawned (e.g. via apply_pending_key_reload).
// An empty list means "no filter" (show all currencies).
let currencies = reloaded_settings.currencies_filter.clone();There was a problem hiding this comment.
Review: refactor DM routing + live subscription updates
Large and well-structured architectural change. The centralized GiftWrap router is the right direction. Two HIGH issues block merge.
🔴 HIGH — DM_ROUTER_CMD_TX race condition on startup
File: src/util/dm_utils/mod.rs
static DM_ROUTER_CMD_TX: Mutex<Option<mpsc::UnboundedSender<DmRouterCmd>>> = Mutex::new(None);set_dm_router_cmd_tx is called in main.rs after create_app_channels(), but wait_for_dm can be called from any async task before the router is initialized. If take_order or send_new_order fires before set_dm_router_cmd_tx completes, wait_for_dm fails immediately with "DM router is not ready".
The PR documents the condition but does not prevent it — there is a startup window where the app is ready for user interaction but the router is not.
Fix: Initialize the router before the Nostr client is operational, or add a synchronization barrier (e.g. tokio::sync::Notify) that wait_for_dm awaits instead of returning an error.
🔴 HIGH — Double TrackOrder in take_order may leave orphaned entry in active_order_trade_indices
File: src/util/order_utils/take_order.rs
// First send — BEFORE knowing the real order_id from Mostro
let _ = tx.send(OrderDmSubscriptionCmd::TrackOrder {
order_id, // ← book order_id, may differ from Mostro response
trade_index: next_idx,
});
// ... wait_for_dm, parse, save_order ...
// Second send — with corrected effective_order_id
let _ = tx.send(OrderDmSubscriptionCmd::TrackOrder {
order_id: effective_order_id,
trade_index: next_idx,
});The router processes the first TrackOrder and inserts order_id -> trade_index into active_order_trade_indices. If Mostro returns a different effective_order_id, the second TrackOrder inserts the correct one but the first entry stays in the map indefinitely — it will never be cleaned up because no terminal message will ever arrive for that phantom order_id.
The comment documents it as "intentionally redundant" but does not address cleanup of the first (potentially wrong) entry.
🟡 MEDIUM — Currency filter no longer reloads from disk
File: src/util/order_utils/fetch_scheduler.rs
// Before (reloaded from disk on every fetch):
let currencies = crate::settings::load_settings_from_disk()
.ok()
.map(|s| s.currencies_filter)
// Now (in-memory snapshot, only updated on task respawn):
let currencies = reloaded_settings.currencies_filter.clone();The comment documents this, but it is a UX regression: changing the currency filter in settings no longer takes effect until a key reload. If this is intentional, it should be explicitly noted in the PR description as a behavioral change.
🟡 MEDIUM — pending_waiters has no size cap or periodic cleanup
File: src/util/dm_utils/mod.rs
let mut pending_waiters: Vec<PendingDmWaiter> = Vec::new();Expired waiters are cleaned up via .is_closed() only when a GiftWrap event arrives. In the absence of events, the vector is never purged. Under a bug or unexpected concurrent usage, this grows unbounded.
Simple fix: add periodic cleanup of closed waiters in the select loop, or enforce a small cap (e.g. 10–20 concurrent waiters is already unrealistic in production).
🟡 MEDIUM — upsert_from_small_order_dm duplicates Order construction logic
File: src/models.rs
The method contains two nearly identical Order construction blocks: one for the happy path and one for the unique-violation retry. Adding a field to Order now requires updating three places: new(), upsert_from_small_order_dm() happy path, and upsert_from_small_order_dm() retry path.
Consider extracting a private build_order_from_small_order(...) helper.
🟢 LOW — OrderDmSubscriptionCmd is an unnecessary type alias
pub type OrderDmSubscriptionCmd = DmRouterCmd;This exports the same type under two names. External callers import OrderDmSubscriptionCmd while internal code uses DmRouterCmd. Pick one name and use it consistently.
✅ What is good
- Router architecture is correct: centralizing GiftWrap in a single listener + channel is the right fix for duplicate subscriptions
fatal.rs:OnceLock+AtomicBoolfor fatal errors is clean, thread-safe, and zero-overheadGiftWrapTerminalPolicyenum: well-designed, avoids ambiguous booleansis_terminal_order_status/trade_message_is_terminal: pure functions, easy to testpending_post_take_operation_result: correct fix for the invoice UI race condition- Structured logs (
[dm_listener],[take_order],[orders_live]): will be invaluable in production debugging ensure_order_giftwrap_subscription: well-abstracted helper, avoids duplication between startup and runtime paths- Subscription cleanup on terminal status: correct, prevents relay subscription leaks
- DB tests updated
Please address the two HIGH issues before merging. Happy to re-review once updated.
Ok Mostronator, HIGH priorities issue should be fixed with last commit, review! |
I ask you to think better to this, if user saves from settings tab a new currency filter it's applied immediately without restarting mostrix, in main.rs you can this behaviour in apply_pending_key_reload function: when user press enter this function is invocated and reload from disk the settings and propagates the new values. |
About this it can still be intentional and reasonable as an API/semantics alias: DmRouterCmd: “all commands the DM router understands” Keep just DmRouterCmd everywhere: remove the alias and update imports/usages (simplest). What do you think? |
|
Mostronator, all the point have been tackled, please review again... |
There was a problem hiding this comment.
Code Review: Single DM Router — PR #47
Reviewed all 46 changed files (2,629 additions, 663 deletions). Checking against previous review comments.
Previous HIGH Issues — All Fixed ✅
1. set_dm_router_cmd_tx unwrap → panic — FIXED
set_dm_router_cmd_tx now returns Result<(), &'static str> and the caller handles it gracefully. The error case logs and continues rather than panicking.
let router_reg = set_dm_router_cmd_tx(dm_subscription_tx.clone());
if let Err(msg) = &router_reg {
log::error!("[dm_listener] {}", msg);
}2. Double TrackOrder orphan entries — FIXED
The router's TrackOrder handler now cleans stale entries before inserting the new one:
let stale: Vec<Uuid> = indices
.iter()
.filter_map(|(oid, idx)| {
if *idx == trade_index && *oid != order_id {
Some(*oid)
} else {
None
}
})
.collect();
for oid in stale {
indices.remove(&oid); // Cleans phantom order_ids from optimistic send
}
indices.insert(order_id, trade_index);3. Mutex poison unwrap in fetch_scheduler — FIXED
Now handles poisoned mutex without panicking:
Ok(mut active) => active.clear(),
Err(e) => {
crate::util::request_fatal_restart(format!(
"Mostrix encountered an internal error (poisoned active order indices lock: {e}). Please restart the app."
));
}Acceptable: calling request_fatal_restart on mutex poison is severe but reasonable since a poisoned mutex indicates memory corruption.
Remaining Issues
MEDIUM: Race window in apply_pending_key_reload
File: src/ui/key_handler/async_tasks.rs — lines 147-156
let (new_dm_tx, new_dm_rx) = tokio::sync::mpsc::unbounded_channel::<OrderDmSubscriptionCmd>();
*dm_subscription_tx = new_dm_tx; // A: sender replaced
let router_reg = set_dm_router_cmd_tx(dm_subscription_tx.clone()); // B: registered
// ... then later:
*message_listener_handle = tokio::spawn(async move {
listen_for_order_messages(..., new_dm_rx, ...) // C: listener starts
});Between A and C, if a TrackOrder is sent via the new sender, it goes to new_dm_rx but no task is reading it yet → silent message loss.
The code comments acknowledge this but don't prevent it. The fix would be to spawn the listener before registering the sender, or to use a oneshot channel for the registration handshake.
Not blocking for merge since:
pending_key_reloadis set before entering this function — user cannot initiate new order takes during this window- Old listener is aborted before new channel creation
- Window is sub-millisecond in practice
MEDIUM: Stale comment about settings_snapshot
File: src/util/order_utils/fetch_scheduler.rs
The comment at line ~20 says:
// TODO: After verifying `latest_settings` works correctly,
// refactor to use `settings_snapshot`.This refactoring was never done. The variable is named latest_settings and the comment is now stale. Recommend removing the comment or doing the refactor.
LOW: unreachable!() in validation path
File: src/ui/key_handler/form_input.rs — lines 47 and 88
_ => unreachable!(),This replaces the previous panic when FiatAmountMax was validated without checking use_range. Now use_range is checked before accessing FiatAmountMax content, so unreachable!() is correct — but it's a silent failure if the code ever changes. Consider a debug_assert! instead for safety in debug builds.
CodeRabbit Items — All Addressed
- ✅
DM_ROUTER_CMD_TXnow returnsResult— handled gracefully - ✅ Panic in
fetch_scheduler— graceful shutdown on mutex poison - ✅
FiatAmountMax—use_rangecheck prevents invalid state - ✅
orders_tab.rscreated_at—and_then+unwrap_or_elsereplacesunwrap_or(0)
Verdict
Approve — all previously identified HIGH issues are resolved. The remaining MEDIUM items are acceptable for merge and documented above.
The DM router architecture is sound. The race window is negligible in practice given the pending_key_reload guard.
Summary
Summary by CodeRabbit
New Features
Bug Fixes
Documentation