Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pherry: restart on bad signature #571

Merged
merged 2 commits into from Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions standalone/pherry/Cargo.toml
Expand Up @@ -19,6 +19,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.8.4"
structopt = { version = "0.3" }
jsonrpsee-types = "0.4.1"

async-trait = "0.1.49"
system = { path = "../../substrate/frame/system", package = "frame-system" }
Expand Down
70 changes: 59 additions & 11 deletions standalone/pherry/src/lib.rs
Expand Up @@ -31,6 +31,7 @@ use phactory_api::blocks::{
use phactory_api::prpc::{self, InitRuntimeResponse};
use phactory_api::pruntime_client;

use msg_sync::{Error as MsgSyncError, Receiver, Sender};
use notify_client::NotifyClient;

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -177,6 +178,9 @@ struct Args {
help = "Max auto restart retries if it continiously failing. Only used with --auto-restart"
)]
max_restart_retries: u32,

#[structopt(long, help = "Restart if number of rpc errors reaches the threshold")]
restart_on_rpc_error_threshold: Option<u64>,
}

struct RunningFlags {
Expand Down Expand Up @@ -800,7 +804,11 @@ pub async fn subxt_connect<T: subxt::Config>(uri: &str) -> Result<subxt::Client<
.context("Connect to substrate")
}

async fn bridge(args: &Args, flags: &mut RunningFlags) -> Result<()> {
async fn bridge(
args: &Args,
flags: &mut RunningFlags,
err_report: Sender<MsgSyncError>,
) -> Result<()> {
// Connect to substrate

let api: RelaychainApi = subxt_connect(&args.substrate_ws_endpoint).await?.into();
Expand Down Expand Up @@ -1027,15 +1035,16 @@ async fn bridge(args: &Args, flags: &mut RunningFlags) -> Result<()> {

// Now we are idle. Let's try to sync the egress messages.
if !args.no_msg_submit {
let mut msg_sync = msg_sync::MsgSync::new(
msg_sync::maybe_sync_mq_egress(
&para_api,
&pr,
&mut signer,
args.tip,
args.longevity,
args.max_sync_msgs_per_round,
);
msg_sync.maybe_sync_mq_egress().await?;
err_report.clone(),
)
.await?;
}
flags.restart_failure_count = 0;
info!("Waiting for new blocks");
Expand All @@ -1061,6 +1070,36 @@ fn preprocess_args(args: &mut Args) {
}
}

async fn collect_async_errors(
mut threshold: Option<u64>,
mut err_receiver: Receiver<MsgSyncError>,
) {
let threshold_bak = threshold.unwrap_or_default();
loop {
match err_receiver.recv().await {
Some(error) => match error {
MsgSyncError::BadSignature => {
warn!("tx received bad signature, restarting...");
return;
}
MsgSyncError::OtherRpcError => {
if let Some(threshold) = &mut threshold {
if *threshold == 0 {
warn!("{} tx errors reported, restarting...", threshold_bak);
return;
}
*threshold -= 1;
}
}
},
None => {
warn!("All senders gone, this should never happen!");
return;
}
}
}
}

pub async fn pherry_main() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
Expand All @@ -1076,14 +1115,23 @@ pub async fn pherry_main() {
};

loop {
if let Err(err) = bridge(&args, &mut flags).await {
info!("bridge() exited with error: {:?}", err);
if !args.auto_restart || flags.restart_failure_count > args.max_restart_retries {
std::process::exit(if flags.worker_registered { 1 } else { 2 });
let (sender, receiver) = msg_sync::create_report_channel();
let threshold = args.restart_on_rpc_error_threshold;
tokio::select! {
res = bridge(&args, &mut flags, sender) => {
if let Err(err) = res {
info!("bridge() exited with error: {:?}", err);
} else {
break;
}
}
flags.restart_failure_count += 1;
sleep(Duration::from_secs(2)).await;
info!("Restarting...");
() = collect_async_errors(threshold, receiver) => ()
};
if !args.auto_restart || flags.restart_failure_count > args.max_restart_retries {
std::process::exit(if flags.worker_registered { 1 } else { 2 });
}
flags.restart_failure_count += 1;
sleep(Duration::from_secs(2)).await;
info!("Restarting...");
}
}