Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't block unfinalised dot source when can't find events #4301

Merged
merged 4 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions engine/src/dot/retry_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
common::option_inner,
retrier::Attempt,
retrier::{Attempt, RetryLimitReturn},
settings::{NodeContainer, WsHttpEndpoints},
witness::common::chain_source::{ChainClient, Header},
};
Expand Down Expand Up @@ -94,7 +94,11 @@ pub trait DotRetryRpcApi: Clone {

async fn extrinsics(&self, block_hash: PolkadotHash) -> Vec<ChainBlockExtrinsic>;

async fn events(&self, block_hash: PolkadotHash) -> Option<Events<PolkadotConfig>>;
async fn events<R: RetryLimitReturn>(
&self,
block_hash: PolkadotHash,
retry_limit: R,
) -> R::ReturnType<Option<Events<PolkadotConfig>>>;

async fn runtime_version(&self, block_hash: Option<H256>) -> RuntimeVersion;

Expand Down Expand Up @@ -134,14 +138,19 @@ impl DotRetryRpcApi for DotRetryRpcClient {
.await
}

async fn events(&self, block_hash: PolkadotHash) -> Option<Events<PolkadotConfig>> {
async fn events<R: RetryLimitReturn>(
&self,
block_hash: PolkadotHash,
retry_limit: R,
) -> R::ReturnType<Option<Events<PolkadotConfig>>> {
self.rpc_retry_client
.request(
Box::pin(move |client| {
.request_with_limit(
Box::pin(move |client: DotHttpRpcClient| {
#[allow(clippy::redundant_async_block)]
Box::pin(async move { client.events(block_hash).await })
}),
RequestLog::new("events".to_string(), Some(format!("{block_hash:?}"))),
retry_limit,
)
.await
}
Expand Down Expand Up @@ -290,7 +299,7 @@ pub mod mocks {

async fn extrinsics(&self, block_hash: PolkadotHash) -> Vec<ChainBlockExtrinsic>;

async fn events(&self, block_hash: PolkadotHash) -> Option<Events<PolkadotConfig>>;
async fn events<R: RetryLimitReturn>(&self, block_hash: PolkadotHash, retry_limit: R) -> R::ReturnType<Option<Events<PolkadotConfig>>>;

async fn runtime_version(&self, block_hash: Option<H256>) -> RuntimeVersion;

Expand All @@ -309,6 +318,8 @@ mod tests {

use utilities::task_scope::task_scope;

use crate::retrier::NoRetryLimit;

use super::*;

#[tokio::test]
Expand All @@ -335,7 +346,7 @@ mod tests {
let extrinsics = dot_retry_rpc_client.extrinsics(hash).await;
println!("extrinsics: {:?}", extrinsics);

let events = dot_retry_rpc_client.events(hash).await;
let events = dot_retry_rpc_client.events(hash, NoRetryLimit).await;
println!("Events: {:?}", events);

let runtime_version = dot_retry_rpc_client.runtime_version(None).await;
Expand Down
78 changes: 61 additions & 17 deletions engine/src/retrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use utilities::{
UnendingStream,
};

#[derive(Debug, Clone)]
enum RetryLimit {
#[derive(Debug, Clone, Copy)]
pub enum RetryLimit {
// For requests that should never fail. Failure in these cases is directly or indirectly the
// fault of the operator. e.g. a faulty Ethereum node.
NoLimit,
Expand Down Expand Up @@ -305,6 +305,54 @@ impl<Client: Send + Sync + Clone + 'static> ClientSelector<Client> {
}
}

#[async_trait::async_trait]
pub trait RetryLimitReturn: Send + 'static {
type ReturnType<T>;

fn into_retry_limit(param_type: Self) -> RetryLimit;

fn inner_to_return_type<T: Send + 'static>(
inner: Result<BoxAny, tokio::sync::oneshot::error::RecvError>,
log_message: String,
) -> Self::ReturnType<T>;
}

pub struct NoRetryLimit;

impl RetryLimitReturn for NoRetryLimit {
type ReturnType<T> = T;

fn into_retry_limit(_param_type: Self) -> RetryLimit {
RetryLimit::NoLimit
}

fn inner_to_return_type<T: Send + 'static>(
inner: Result<BoxAny, tokio::sync::oneshot::error::RecvError>,
_log_message: String,
) -> Self::ReturnType<T> {
let result: BoxAny = inner.unwrap();
*result.downcast::<T>().expect("We know we cast the T into an any, and it is a T that we are receiving. Hitting this is a programmer error.")
}
}

pub struct SetRetryLimit {}

impl RetryLimitReturn for u32 {
type ReturnType<T> = Result<T>;

fn into_retry_limit(param_type: Self) -> RetryLimit {
RetryLimit::Limit(param_type)
}

fn inner_to_return_type<T: Send + 'static>(
inner: Result<BoxAny, tokio::sync::oneshot::error::RecvError>,
log_message: String,
) -> Self::ReturnType<T> {
let result: BoxAny = inner.map_err(|_| anyhow::anyhow!("{log_message}"))?;
Ok(*result.downcast::<T>().expect("We know we cast the T into an any, and it is a T that we are receiving. Hitting this is a programmer error."))
}
}

/// Requests submitted to this client will be retried until success.
/// When a request fails it will be retried after a delay that exponentially increases on each retry
/// attempt.
Expand Down Expand Up @@ -428,28 +476,24 @@ where
specific_closure: TypedFutureGenerator<T, Client>,
request_log: RequestLog,
) -> T {
let rx = self.send_request(specific_closure, request_log, RetryLimit::NoLimit).await;
let result: BoxAny = rx.await.unwrap();
*result.downcast::<T>().expect("We know we cast the T into an any, and it is a T that we are receiving. Hitting this is a programmer error.")
self.request_with_limit::<T, NoRetryLimit>(specific_closure, request_log, NoRetryLimit)
.await
}

/// Requests something to be retried by the retry client, with an explicit retry limit.
/// Returns an error if the retry limit is reached.
pub async fn request_with_limit<T: Send + 'static>(
pub async fn request_with_limit<T: Send + 'static, R: RetryLimitReturn>(
&self,
specific_closure: TypedFutureGenerator<T, Client>,
request_log: RequestLog,
retry_limit: Attempt,
) -> Result<T> {
let rx = self
.send_request(specific_closure, request_log.clone(), RetryLimit::Limit(retry_limit))
.await;
let result: BoxAny = rx.await.map_err(|_| {
anyhow::anyhow!(
"Maximum attempt of `{retry_limit}` reached for request `{request_log}`."
)
})?;
Ok(*result.downcast::<T>().expect("We know we cast the T into an any, and it is a T that we are receiving. Hitting this is a programmer error."))
retry_limit: R,
) -> R::ReturnType<T> {
let retry_limit = R::into_retry_limit(retry_limit);
let rx = self.send_request(specific_closure, request_log.clone(), retry_limit).await;
R::inner_to_return_type(
rx.await,
format!("Maximum attempt of `{retry_limit:?}` reached for request `{request_log}`."),
)
}
}

Expand Down
21 changes: 16 additions & 5 deletions engine/src/witness/dot/dot_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{pin::Pin, time::Duration};

use crate::retrier::NoRetryLimit;
use cf_chains::dot::PolkadotHash;
use cf_primitives::PolkadotBlockNumber;
use futures_util::stream;
Expand All @@ -21,23 +22,26 @@ use anyhow::Result;
use subxt::{self, config::Header as SubxtHeader};

macro_rules! polkadot_source {
($self:expr, $func:ident) => {{
($self:expr, $func:ident, $retry_limit:expr, $unwrap_events:expr) => {{
struct State<C> {
client: C,
stream: Pin<Box<dyn Stream<Item = Result<PolkadotHeader>> + Send>>,
}

let client = $self.client.clone();
let stream = client.$func().await;
let unwrap_events = $unwrap_events;

(
Box::pin(stream::unfold(State { client, stream }, |mut state| async move {
Box::pin(stream::unfold(State { client, stream }, move |mut state| async move {
loop {
while let Ok(Some(header)) =
tokio::time::timeout(TIMEOUT, state.stream.next()).await
{
if let Ok(header) = header {
let Some(events) = state.client.events(header.hash()).await else {
let Some(events) = unwrap_events(
state.client.events(header.hash(), $retry_limit).await,
) else {
continue
};

Expand Down Expand Up @@ -99,7 +103,12 @@ where
async fn stream_and_client(
&self,
) -> (BoxChainStream<'_, Self::Index, Self::Hash, Self::Data>, Self::Client) {
polkadot_source!(self, subscribe_best_heads)
// For the unfinalised source we limit to two retries, so we try the primary and backup. We
// stop here becauase for unfinalised it's possible the block simple doesn't exist, due to a
// reorg.
polkadot_source!(self, subscribe_best_heads, 2, |raw_events: Result<
Option<Events<PolkadotConfig>>,
>| raw_events.ok().flatten())
}
}

Expand Down Expand Up @@ -147,7 +156,9 @@ impl<
async fn stream_and_client(
&self,
) -> (BoxChainStream<'_, Self::Index, Self::Hash, Self::Data>, Self::Client) {
polkadot_source!(self, subscribe_finalized_heads)
polkadot_source!(self, subscribe_finalized_heads, NoRetryLimit, |raw_events: Option<
Events<PolkadotConfig>,
>| raw_events)
}
}

Expand Down