Skip to content

Commit

Permalink
feat(tap-agent): use limit to truncate receipts (#194)
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
  • Loading branch information
gusinacio authored May 27, 2024
1 parent 3a70fa2 commit cd0c1cd
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 37 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 83 additions & 8 deletions tap-agent/src/tap/context/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloy_primitives::hex::ToHex;
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
use sqlx::{postgres::types::PgRange, types::BigDecimal};
use tap_core::{
manager::adapters::{ReceiptDelete, ReceiptRead},
manager::adapters::{safe_truncate_receipts, ReceiptDelete, ReceiptRead},
receipt::{Checking, Receipt, ReceiptWithState, SignedReceipt},
};
use thegraph::types::Address;
Expand Down Expand Up @@ -79,29 +79,33 @@ impl ReceiptRead for TapAgentContext {
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + Send>(
&self,
timestamp_range_ns: R,
// TODO: Make use of this limit in this function
_receipts_limit: Option<u64>,
receipts_limit: Option<u64>,
) -> Result<Vec<ReceiptWithState<Checking>>, Self::AdapterError> {
let signers = signers_trimmed(&self.escrow_accounts, self.sender)
.await
.map_err(|e| AdapterError::ReceiptRead {
error: format!("{:?}.", e),
})?;

let receipts_limit = receipts_limit.map_or(1000, |limit| limit);

let records = sqlx::query!(
r#"
SELECT id, signature, allocation_id, timestamp_ns, nonce, value
FROM scalar_tap_receipts
WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[]))
AND $3::numrange @> timestamp_ns
AND $3::numrange @> timestamp_ns
ORDER BY timestamp_ns ASC
LIMIT $4
"#,
self.allocation_id.encode_hex::<String>(),
&signers,
rangebounds_to_pgrange(timestamp_range_ns)
rangebounds_to_pgrange(timestamp_range_ns),
(receipts_limit + 1) as i64,
)
.fetch_all(&self.pgpool)
.await?;
records
let mut receipts = records
.into_iter()
.map(|record| {
let signature = record.signature.as_slice().try_into()
Expand Down Expand Up @@ -148,7 +152,11 @@ impl ReceiptRead for TapAgentContext {
Ok(ReceiptWithState::new(signed_receipt))

})
.collect()
.collect::<Result<Vec<ReceiptWithState<Checking>>, AdapterError>>()?;

safe_truncate_receipts(&mut receipts, receipts_limit);

Ok(receipts)
}
}

Expand Down Expand Up @@ -275,7 +283,6 @@ mod test {

// Retrieving receipts in timestamp range from the database, convert to json Value
let recovered_received_receipt_vec = storage_adapter
// TODO: Make use of the receipt limit if it makes sense here
.retrieve_receipts_in_timestamp_range(range, None)
.await?
.into_iter()
Expand Down Expand Up @@ -424,6 +431,74 @@ mod test {
Ok(())
}

#[sqlx::test(migrations = "../migrations")]
async fn retrieve_receipts_with_limit(pgpool: PgPool) {
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
HashMap::from([(SENDER.1, 1000.into())]),
HashMap::from([(SENDER.1, vec![SIGNER.1])]),
));

let storage_adapter = TapAgentContext::new(
pgpool.clone(),
*ALLOCATION_ID_0,
SENDER.1,
escrow_accounts.clone(),
EscrowAdapter::mock(),
);

// Creating 100 receipts with timestamps 42 to 141
for i in 0..100 {
let receipt = create_received_receipt(
&ALLOCATION_ID_0,
&SIGNER.0,
i + 684,
i + 42,
(i + 124).into(),
);
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
}

let recovered_received_receipt_vec = storage_adapter
.retrieve_receipts_in_timestamp_range(0..141, Some(10))
.await
.unwrap();
assert_eq!(recovered_received_receipt_vec.len(), 10);

let recovered_received_receipt_vec = storage_adapter
.retrieve_receipts_in_timestamp_range(0..141, Some(50))
.await
.unwrap();
assert_eq!(recovered_received_receipt_vec.len(), 50);

// add a copy in the same timestamp
for i in 0..100 {
let receipt = create_received_receipt(
&ALLOCATION_ID_0,
&SIGNER.0,
i + 684,
i + 43,
(i + 124).into(),
);
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
}

let recovered_received_receipt_vec = storage_adapter
.retrieve_receipts_in_timestamp_range(0..141, Some(10))
.await
.unwrap();
assert_eq!(recovered_received_receipt_vec.len(), 9);

let recovered_received_receipt_vec = storage_adapter
.retrieve_receipts_in_timestamp_range(0..141, Some(50))
.await
.unwrap();
assert_eq!(recovered_received_receipt_vec.len(), 49);
}

#[sqlx::test(migrations = "../migrations")]
async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) {
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
Expand Down

0 comments on commit cd0c1cd

Please sign in to comment.