Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Add guards for processing unconfirmed solutions #3225

Merged
merged 8 commits into from
Apr 24, 2024
10 changes: 10 additions & 0 deletions node/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,13 @@ async fn log_middleware(

Ok(next.run(request).await)
}

/// Formats an ID into a truncated identifier (for logging purposes).
pub fn fmt_id(id: impl ToString) -> String {
let id = id.to_string();
let mut formatted_id = id.chars().take(16).collect::<String>();
if id.chars().count() > 16 {
formatted_id.push_str("..");
}
formatted_id
}
37 changes: 35 additions & 2 deletions node/rest/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(tx): Json<Transaction<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the transaction if the node is syncing.
if !rest.routing.is_block_synced() {
return Err(RestError(format!("Unable to broadcast transaction '{}' (node is syncing)", fmt_id(tx.id()))));
}

// If the consensus module is enabled, add the unconfirmed transaction to the memory pool.
if let Some(consensus) = rest.consensus {
// Add the unconfirmed transaction to the memory pool.
Expand All @@ -337,10 +342,38 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
State(rest): State<Self>,
Json(solution): Json<Solution<N>>,
) -> Result<ErasedJson, RestError> {
// Do not process the solution if the node is syncing.
if !rest.routing.is_block_synced() {
return Err(RestError(format!(
"Unable to broadcast solution '{}' (node is syncing)",
fmt_id(solution.id())
)));
}

// If the consensus module is enabled, add the unconfirmed solution to the memory pool.
if let Some(consensus) = rest.consensus {
// Otherwise, verify it prior to broadcasting.
Copy link
Contributor

@vicsn vicsn Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine spamming a publicly accessible REST endpoint with solutions could halt a client due to the amount of verifications required. But we should test (cc @iamalwaysuncomfortable).

If it turns out to be an issue, we can:

  • accept the risk and tell everyone not to expose their REST endpoint...
  • add a separate transmission rate limit using a second .layer(GovernorLayer { }.
  • queue transmissions to verify, takes more work but seems more robust. See my comment below for an example implementation

match rest.consensus {
// Add the unconfirmed solution to the memory pool.
consensus.add_unconfirmed_solution(solution).await?;
Some(consensus) => consensus.add_unconfirmed_solution(solution).await?,
// Verify the solution.
None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If clients receive transactions/solutions via P2P, they are already verified before propagation in fn unconfirmed_{transaction, solution}. So I suggest we just call that function from here. This is what the validator also does.

For consistency and clarity, we should also do this for transactions. A solution takes longer to verify than an execution, but less than a deployment, so there's no difference justifying a different verification approach.

Recall this PR was triggered in part because recently we did a large invalid solution spam, but @iamalwaysuncomfortable and I just realized we never tried spamming very large amounts of invalid transactions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also remains relevant in the face of solution or deployment spam: https://github.com/AleoHQ/snarkOS/pull/2970

// Compute the current epoch hash.
let epoch_hash = rest.ledger.latest_epoch_hash()?;
// Retrieve the current proof target.
let proof_target = rest.ledger.latest_proof_target();
// Ensure that the solution is valid for the given epoch.
let puzzle = rest.ledger.puzzle().clone();
// Verify the solution in a blocking task.
match tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target))
.await
{
Ok(Ok(())) => {}
Ok(Err(err)) => {
return Err(RestError(format!("Invalid solution '{}' - {err}", fmt_id(solution.id()))));
}
Err(err) => return Err(RestError(format!("Invalid solution '{}' - {err}", fmt_id(solution.id())))),
}
}
}

let solution_id = solution.id();
Expand Down
18 changes: 14 additions & 4 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,20 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedSolution(message) => {
// Clone the serialized message.
let serialized = message.clone();
// Do not process unconfirmed solutions if the node is syncing.
if !self.is_block_synced() {
trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
return Ok(());
}
// Update the timestamp for the unconfirmed solution.
let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
// Determine whether to propagate the solution.
if seen_before {
trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
return Ok(());
}
// Clone the serialized message.
let serialized = message.clone();
// Perform the deferred non-blocking deserialization of the solution.
let solution = match message.solution.deserialize().await {
Ok(solution) => solution,
Expand All @@ -232,8 +237,11 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}
Message::UnconfirmedTransaction(message) => {
// Clone the serialized message.
let serialized = message.clone();
// Do not process unconfirmed transactions if the node is syncing.
if !self.is_block_synced() {
trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
return Ok(());
}
// Update the timestamp for the unconfirmed transaction.
let seen_before =
self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
Expand All @@ -242,6 +250,8 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
return Ok(());
}
// Clone the serialized message.
let serialized = message.clone();
// Perform the deferred non-blocking deserialization of the transaction.
let transaction = match message.transaction.deserialize().await {
Ok(transaction) => transaction,
Expand Down
3 changes: 3 additions & 0 deletions node/router/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
/// Returns a reference to the router.
fn router(&self) -> &Router<N>;

/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool;

/// Sends a "Ping" message to the given peer.
fn send_ping(&self, peer_ip: SocketAddr, block_locators: Option<BlockLocators<N>>) {
self.send(peer_ip, Message::Ping(Ping::new(self.router().node_type(), block_locators)));
Expand Down
5 changes: 5 additions & 0 deletions node/router/tests/common/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ impl<N: Network> Outbound<N> for TestRouter<N> {
fn router(&self) -> &Router<N> {
&self.0
}

/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool {
true
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Client<N, C> {
fn router(&self) -> &Router<N> {
&self.router
}

/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/prover/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Prover<N, C> {
fn router(&self) -> &Router<N> {
&self.router
}

/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool {
true
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ impl<N: Network, C: ConsensusStorage<N>> Outbound<N> for Validator<N, C> {
fn router(&self) -> &Router<N> {
&self.router
}

/// Returns `true` if the node is synced up to the latest block (within the given tolerance).
fn is_block_synced(&self) -> bool {
self.sync.is_block_synced()
}
}

#[async_trait]
Expand Down