Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 136 additions & 4 deletions src/payment/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ use crate::error::{Error, Result};
use crate::logging::debug;
use crate::payment::metrics::QuotingMetricsTracker;
use crate::payment::pricing::calculate_price;
use crate::storage::lmdb::LmdbStorage;
use evmlib::merkle_payments::MerklePaymentCandidateNode;
use evmlib::PaymentQuote;
use evmlib::RewardsAddress;
use parking_lot::RwLock;
use saorsa_core::MlDsa65;
use saorsa_pqc::pqc::types::MlDsaSecretKey;
use saorsa_pqc::pqc::MlDsaOperations;
use std::sync::Arc;
use std::time::SystemTime;

/// Content address type (32-byte `XorName`).
Expand All @@ -32,8 +35,24 @@ pub type SignFn = Box<dyn Fn(&[u8]) -> Vec<u8> + Send + Sync>;
pub struct QuoteGenerator {
/// The rewards address for receiving payments.
rewards_address: RewardsAddress,
/// Metrics tracker for quoting.
/// Fallback in-memory record counter for pricing.
///
/// Only consulted when no [`LmdbStorage`] is attached (unit tests, or a
/// mis-configured startup). In production the price is derived from the
/// attached store's `current_chunks()` instead — see [`Self::storage`].
metrics_tracker: QuotingMetricsTracker,
/// Authoritative on-disk record-count source for pricing.
///
/// When attached, quote prices are computed from
/// [`LmdbStorage::current_chunks()`] — the **same** count the
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate
/// compares the quote against. Keeping pricing and freshness on one source
/// means a quote priced at record count `N` is later checked against a
/// current count that differs only by genuine in-flight growth, instead of
/// by the standing client-PUT-vs-replication gap that rejected every
/// payment when pricing read the side counter and freshness read the store.
/// `None` until [`Self::attach_storage`] is called.
storage: RwLock<Option<Arc<LmdbStorage>>>,
/// Signing function provided by the node.
/// Takes bytes and returns a signature.
sign_fn: Option<SignFn>,
Expand All @@ -55,11 +74,47 @@ impl QuoteGenerator {
Self {
rewards_address,
metrics_tracker,
storage: RwLock::new(None),
sign_fn: None,
pub_key: Vec::new(),
}
}

/// Attach the node's [`LmdbStorage`] so quote prices reflect the
/// authoritative on-disk record count.
///
/// This MUST be wired to the same `LmdbStorage` the
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate reads
/// via `current_chunks()`; otherwise pricing and freshness diverge and the
/// gate rejects healthy payments. Idempotent: calling twice replaces the
/// handle. Uses interior mutability so it can be called on an `Arc`.
pub fn attach_storage(&self, storage: Arc<LmdbStorage>) {
*self.storage.write() = Some(storage);
debug!("QuoteGenerator: LmdbStorage attached for current-records pricing");
}

/// Record count used to price quotes.
///
/// Prefers the attached `LmdbStorage` count (authoritative — counts client
/// PUTs, replication stores, and repair fetches alike, exactly matching the
/// verifier's freshness source). Falls back to the in-memory
/// `metrics_tracker` when no storage is attached or the read fails, so
/// pricing never panics or stalls.
fn pricing_records_stored(&self) -> usize {
if let Some(storage) = self.storage.read().as_ref() {
match storage.current_chunks() {
Ok(n) => return usize::try_from(n).unwrap_or(usize::MAX),
Err(e) => {
debug!(
"QuoteGenerator: current_chunks() failed ({e}); \
falling back to metrics_tracker for pricing"
);
}
}
}
self.metrics_tracker.records_stored()
}

/// Set the signing function for quote generation.
///
/// # Arguments
Expand Down Expand Up @@ -128,8 +183,10 @@ impl QuoteGenerator {

let timestamp = SystemTime::now();

// Calculate price from current record count
let price = calculate_price(self.metrics_tracker.records_stored());
// Calculate price from the authoritative current record count (the same
// count the verifier's freshness gate reads), falling back to the
// in-memory counter only when no storage is attached.
let price = calculate_price(self.pricing_records_stored());

// Convert XorName to xor_name::XorName
let xor_name = xor_name::XorName(content);
Expand Down Expand Up @@ -205,7 +262,7 @@ impl QuoteGenerator {
.as_ref()
.ok_or_else(|| Error::Payment("Quote signing not configured".to_string()))?;

let price = calculate_price(self.metrics_tracker.records_stored());
let price = calculate_price(self.pricing_records_stored());

// Compute the same bytes_to_sign used by the upstream library
let msg = MerklePaymentCandidateNode::bytes_to_sign(
Expand Down Expand Up @@ -313,6 +370,81 @@ mod tests {
generator
}

/// Regression test for the STG-01 quote-freshness rejection: pricing must
/// read the attached store's `current_chunks()`, NOT the side counter.
///
/// Before the fix, the price came from `metrics_tracker` (client-PUT count
/// only) while the verifier's freshness gate read `current_chunks()` (all
/// records, including replicated ones). On a replicating network the store
/// count ran far ahead of the side counter, so every quote looked "stale".
/// Here we attach a store, write records WITHOUT touching the side counter
/// (mimicking replication stores), and assert the quote prices off the
/// store count — i.e. the two sources now agree.
#[tokio::test]
async fn test_pricing_tracks_attached_storage_not_side_counter() {
use crate::payment::pricing::derive_records_stored_from_price;
use crate::storage::{LmdbStorage, LmdbStorageConfig};
use tempfile::TempDir;

let temp_dir = TempDir::new().expect("temp dir");
let storage = Arc::new(
LmdbStorage::new(LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
..LmdbStorageConfig::test_default()
})
.await
.expect("create storage"),
);

// Side counter deliberately starts well BELOW the store count to model
// a node whose records arrived mostly via replication (which never
// increments the side counter).
let metrics_tracker = QuotingMetricsTracker::new(3);
let mut generator = QuoteGenerator::new(RewardsAddress::new([1u8; 20]), metrics_tracker);
generator.set_signer(vec![0u8; 64], |bytes| {
let mut sig = vec![0u8; 64];
for (i, b) in bytes.iter().take(64).enumerate() {
sig[i] = *b;
}
sig
});
generator.attach_storage(Arc::clone(&storage));

// Write 25 distinct records straight to the store, as a replication
// store would — the side counter stays at 3.
for i in 0..25u32 {
let content = format!("replicated-record-{i}");
let address = LmdbStorage::compute_address(content.as_bytes());
storage
.put(&address, content.as_bytes())
.await
.expect("put");
}
assert_eq!(
generator.records_stored(),
3,
"side counter must be untouched"
);
assert_eq!(storage.current_chunks().expect("count"), 25);

let quote = generator
.create_quote([42u8; 32], 1024, 0)
.expect("create quote");

// Price must encode 25 (the store count), not 3 (the side counter).
assert_eq!(
quote.price,
calculate_price(25),
"price must be derived from current_chunks(), not metrics_tracker"
);
assert_eq!(
derive_records_stored_from_price(quote.price),
25,
"verifier's price-inverse must recover the store count, keeping the \
freshness delta at ~0 for a freshly issued quote"
);
}

#[test]
fn test_create_quote() {
let generator = create_test_generator();
Expand Down
18 changes: 12 additions & 6 deletions src/storage/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,16 @@ impl AntProtocol {
payment_verifier: Arc<PaymentVerifier>,
quote_generator: Arc<QuoteGenerator>,
) -> Self {
// Keep the PaymentVerifier's freshness gate wired to the same
// authoritative store used by this protocol handler. Attaching here
// Keep the PaymentVerifier's freshness gate AND the QuoteGenerator's
// pricing wired to the same authoritative store used by this protocol
// handler. Pricing and the freshness gate MUST read the same record
// count: the generator prices a quote from current_chunks() and the
// verifier later checks the quote against current_chunks(), so the only
// difference they see is genuine in-flight growth. Attaching both here
// makes the invariant automatic for every AntProtocol construction
// path, including tests and future startup variants.
payment_verifier.attach_storage(Arc::clone(&storage));
quote_generator.attach_storage(Arc::clone(&storage));

Self {
storage,
Expand Down Expand Up @@ -263,10 +268,11 @@ impl AntProtocol {
Ok(_) => {
let content_len = request.content.len();
info!("Stored chunk {addr_hex} ({content_len} bytes)");
// Increment the close-records counter consumed by calculate_price.
// The PaymentVerifier reads its current record count directly
// from LmdbStorage::current_chunks(), so we no longer need to
// push the value through a side counter here.
// Bump the in-memory fallback counter. Both pricing and the
// freshness gate now read LmdbStorage::current_chunks() directly,
// so this counter only matters when no storage is attached
// (unit tests / mis-configured startup). Kept warm so that
// fallback path stays roughly accurate.
self.quote_generator.record_store();

// 6. Notify replication engine for fresh fan-out.
Expand Down
Loading