diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index dfc2bb0385..ea21e9c891 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -24,7 +24,7 @@ use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; use tokio_stream::StreamExt; -use chain_connector::{CCInitParams, ChainConnector, ConnectorError}; +use chain_connector::{ChainConnector, ConnectorError}; use chain_data::{parse_log, peer_id_to_hex, ChainData, Log}; use chain_types::{ CommitmentId, CommitmentStatus, ComputeUnit, DealStatus, PendingUnit, COMMITMENT_IS_NOT_ACTIVE, @@ -43,7 +43,7 @@ use crate::event::{ }; use crate::persistence; -const PROOF_POLL_LIMIT: usize = 50; +const PROOF_POLL_LIMIT: usize = 10; pub struct ChainListener { config: ChainConfig, @@ -98,7 +98,6 @@ impl ChainListener { host_id: PeerId, chain_connector: Arc, core_manager: Arc, - init_params: CCInitParams, ws_client: WsClient, ccp_client: Option, persisted_proof_id_dir: PathBuf, @@ -112,11 +111,11 @@ impl ChainListener { ws_client, config: chain_config, host_id, - difficulty: init_params.difficulty, - init_timestamp: init_params.init_timestamp, - global_nonce: init_params.global_nonce, - current_epoch: init_params.current_epoch, - epoch_duration: init_params.epoch_duration, + difficulty: Difficulty::default(), + init_timestamp: U256::zero(), + global_nonce: GlobalNonce::new([0; 32]).into(), + current_epoch: U256::zero(), + epoch_duration: U256::zero(), current_commitment: None, active_compute_units: BTreeSet::new(), pending_compute_units: BTreeSet::new(), @@ -134,14 +133,17 @@ impl ChainListener { } } - async fn get_current_commitment_id(&self) -> eyre::Result> { + async fn refresh_current_commitment_id(&mut self) -> eyre::Result<()> { match self.chain_connector.get_current_commitment_id().await { - Ok(id) => Ok(id), + Ok(id) => { + self.current_commitment = id; + Ok(()) + } Err(err) => match err { ConnectorError::RpcCallError { ref data, .. } => { if data.contains(PEER_NOT_EXISTS) { tracing::info!("Peer doesn't exist on chain. Waiting for market offer"); - Ok(None) + Ok(()) } else { tracing::error!(target: "chain-listener", "Failed to get current commitment id: {err}"); Err(err.into()) @@ -155,11 +157,32 @@ impl ChainListener { } } - async fn refresh_compute_units(&mut self) -> eyre::Result<()> { + async fn refresh_commitment_params(&mut self) -> eyre::Result<()> { + let init_params = + self.chain_connector + .get_cc_init_params() + .await + .map_err(|err| { + tracing::info!(target: "chain-listener", "Error getting Commitment initial params: {err}"); + err + })?; + + self.difficulty = init_params.difficulty; + self.init_timestamp = init_params.init_timestamp; + self.global_nonce = init_params.global_nonce; + self.epoch_duration = init_params.epoch_duration; + self.current_epoch = init_params.current_epoch; + + tracing::info!(target: "chain-listener","Commitment initial params: difficulty {}, global nonce {}, init_timestamp {}, epoch_duration {}, current_epoch {}", init_params.difficulty, init_params.global_nonce, init_params.init_timestamp, init_params.epoch_duration, init_params.current_epoch); + Ok(()) + } + + async fn refresh_state(&mut self) -> eyre::Result<()> { loop { let result: eyre::Result<()> = try { - let (active, pending) = self.get_compute_units().await?; - self.current_commitment = self.get_current_commitment_id().await?; + self.refresh_commitment_params().await?; + self.refresh_compute_units().await?; + self.refresh_current_commitment_id().await?; if let Some(ref c) = self.current_commitment { tracing::info!(target: "chain-listener", "Current commitment id: {}", c); @@ -177,8 +200,6 @@ impl ChainListener { match status { CommitmentStatus::Active => { - self.active_compute_units.extend(active); - self.pending_compute_units.extend(pending); self.refresh_commitment().await?; } @@ -194,6 +215,7 @@ impl ChainListener { } } + self.load_proof_id().await?; () }; @@ -311,8 +333,7 @@ impl ChainListener { tracing::info!(target: "chain-listener", "Subscribed successfully"); let setup: eyre::Result<()> = try { - self.load_proof_id().await?; - self.refresh_compute_units().await?; + self.refresh_state().await?; }; if let Err(err) = setup { tracing::error!(target: "chain-listener", "ChainListener: compute units refresh error: {err}"); @@ -324,80 +345,75 @@ impl ChainListener { loop { tokio::select! { event = poll_subscription(&mut self.heads) => { - if let Some(header) = event { - if let Err(err) = self.process_new_header(header).await { - tracing::error!(target: "chain-listener", "newHeads event processing error: {err}"); - if let Err(err) = self.subscribe_new_heads().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); - exit(1); - } - } - } else { - if let Err(err) = self.subscribe_new_heads().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); - exit(1); + if let Err(err) = self.process_new_header(event).await { + tracing::error!(target: "chain-listener", "newHeads event processing error: {err}"); + + let result: eyre::Result<()> = try { + self.refresh_state().await?; + self.subscribe_new_heads().await?; + }; + + if let Err(err) = result { + tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping..."); + exit(1); } } }, event = poll_subscription(&mut self.commitment_activated) => { - if let Some(cc) = event { - if let Err(err) = self.process_commitment_activated(cc).await { - tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}"); - if let Err(err) = self.subscribe_cc_activated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping..."); - exit(1); - } - } - } else { - if let Err(err) = self.subscribe_cc_activated().await { + if let Err(err) = self.process_commitment_activated(event).await { + tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}"); + + let result: eyre::Result<()> = try { + self.refresh_state().await?; + self.subscribe_cc_activated().await?; + }; + if let Err(err) = result { tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping..."); exit(1); } } }, event = poll_subscription(&mut self.unit_activated) => { - if let Some(event) = event { + if self.unit_activated.is_some() { if let Err(err) = self.process_unit_activated(event).await { - tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}"); - if let Err(err) = self.subscribe_unit_activated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping..."); + tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}"); + + let result: eyre::Result<()> = try { + self.refresh_state().await?; + self.subscribe_unit_activated().await?; + }; + if let Err(err) = result { + tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated event: {err}; Stopping..."); exit(1); } } - } else { - if let Err(err) = self.subscribe_unit_activated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping..."); - exit(1); - } } }, event = poll_subscription(&mut self.unit_deactivated) => { - if let Some(event) = event { - if let Err(err) = self.process_unit_deactivated(event).await { + if self.unit_deactivated.is_some() { + if let Err(err) = self.process_unit_deactivated(event).await { tracing::error!(target: "chain-listener", "UnitDeactivated event processing error: {err}"); - if let Err(err) = self.subscribe_unit_deactivated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping..."); + + let result: eyre::Result<()> = try { + self.refresh_state().await?; + self.subscribe_unit_deactivated().await?; + }; + if let Err(err) = result { + tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated event: {err}; Stopping..."); exit(1); } } - } else { - if let Err(err) = self.subscribe_unit_deactivated().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping..."); - exit(1); - } } }, event = poll_subscription(&mut self.unit_matched) => { - if let Some(event) = event { - if let Err(err) = self.process_deal_matched(event) { - tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}"); - if let Err(err) = self.subscribe_deal_matched().await { - tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping..."); - exit(1); - } - } - } else { - if let Err(err) = self.subscribe_deal_matched().await { + if let Err(err) = self.process_deal_matched(event) { + tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}"); + + let result: eyre::Result<()> = try { + self.refresh_state().await?; + self.subscribe_deal_matched().await?; + }; + if let Err(err) = result { tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping..."); exit(1); } @@ -439,7 +455,7 @@ impl ChainListener { } /// Returns active and pending compute units - async fn get_compute_units(&mut self) -> eyre::Result<(Vec, Vec)> { + async fn refresh_compute_units(&mut self) -> eyre::Result<()> { let mut units = self.chain_connector.get_compute_units().await?; let in_deal: Vec<_> = units.extract_if(|cu| cu.deal.is_some()).collect(); @@ -448,43 +464,44 @@ impl ChainListener { .into_iter() .partition(|unit| unit.start_epoch <= self.current_epoch); - let active: Vec<_> = active.into_iter().map(|unit| unit.id).collect(); - let pending: Vec = pending.into_iter().map(PendingUnit::from).collect(); + self.active_compute_units + .extend(active.into_iter().map(|unit| unit.id)); + self.pending_compute_units + .extend(pending.into_iter().map(PendingUnit::from)); + + for cu in in_deal { + if let Some(deal) = cu.deal { + self.active_deals.insert(deal, cu.id); + } + } tracing::info!(target: "chain-listener", "Compute units mapping: active {}, pending {}, in deal {}", - active.len(), - pending.len(), - in_deal.len() + self.active_compute_units.len(), + self.pending_compute_units.len(), + self.active_deals.len() ); // TODO: log compute units pretty tracing::info!(target: "chain-listener", "Active compute units: {:?}", - active.iter().map(CUID::to_string).collect::>() + self.active_compute_units.iter().map(CUID::to_string).collect::>() ); tracing::info!(target: "chain-listener", "Pending compute units: {:?}", - pending + self.pending_compute_units .iter() .map(|cu| cu.id.to_string()) .collect::>() ); tracing::info!(target: "chain-listener", "In deal compute units: {:?}", - in_deal - .iter() - .map(|cu| cu.id.to_string()) + self.active_deals.values() + .map(CUID::to_string) .collect::>() ); - for cu in in_deal { - if let Some(deal) = cu.deal { - self.active_deals.insert(deal, cu.id); - } - } - - Ok((active, pending)) + Ok(()) } async fn subscribe_new_heads(&mut self) -> eyre::Result<()> { @@ -602,9 +619,14 @@ impl ChainListener { Ok(()) } - async fn process_new_header(&mut self, event: Result) -> eyre::Result<()> { + async fn process_new_header( + &mut self, + event: Option>, + ) -> eyre::Result<()> { + let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?; + // TODO: add block_number to metrics - let (block_timestamp, _block_number) = Self::parse_block_header(event?)?; + let (block_timestamp, _block_number) = Self::parse_block_header(header?)?; // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` let epoch_number = @@ -650,8 +672,11 @@ impl ChainListener { async fn process_commitment_activated( &mut self, - event: Result, + event: Option>, ) -> eyre::Result<()> { + let event = event.ok_or(eyre!( + "Failed to process CommitmentActivated event: got None" + ))?; let cc_event = parse_log::(event?)?; let unit_ids = cc_event.info.unit_ids; tracing::info!(target: "chain-listener", @@ -685,7 +710,11 @@ impl ChainListener { Ok(()) } - async fn process_unit_activated(&mut self, event: Result) -> eyre::Result<()> { + async fn process_unit_activated( + &mut self, + event: Option>, + ) -> eyre::Result<()> { + let event = event.ok_or(eyre!("Failed to process UnitActivated event: got None"))?; let unit_event = parse_log::(event?)?; tracing::info!(target: "chain-listener", "Received UnitActivated event for unit: {}, startEpoch: {}", @@ -706,8 +735,9 @@ impl ChainListener { /// Unit goes to Deal async fn process_unit_deactivated( &mut self, - event: Result, + event: Option>, ) -> eyre::Result<()> { + let event = event.ok_or(eyre!("Failed to process UnitDeactivated event: got None"))?; let unit_event = parse_log::(event?)?; tracing::info!(target: "chain-listener", @@ -722,7 +752,11 @@ impl ChainListener { Ok(()) } - pub fn process_deal_matched(&mut self, event: Result) -> eyre::Result<()> { + pub fn process_deal_matched( + &mut self, + event: Option>, + ) -> eyre::Result<()> { + let event = event.ok_or(eyre!("Failed to process DealMatched event: got None"))?; let deal_event = parse_log::(event?)?; tracing::info!(target: "chain-listener", "Received DealMatched event for deal: {}", diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index 7914e2c7dc..77d1ed8cf8 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -65,8 +65,8 @@ pub fn default_bootstrap_nodes() -> Vec { pub fn default_system_cpu_count() -> usize { let total = num_cpus::get_physical(); match total { - x if x > 32 => 4, - x if x > 2 => 2, + x if x > 32 => 3, + x if x > 7 => 2, _ => 1, } } diff --git a/nox/src/node.rs b/nox/src/node.rs index 93c49db975..5ee9a9772a 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -135,13 +135,6 @@ async fn setup_listener( let cc_events_dir = config.dir_config.cc_events_dir.clone(); let host_id = config.root_key_pair.get_peer_id(); - - let init_params = connector.get_cc_init_params().await.map_err(|err| { - log::error!("Error getting Commitment initial params: {err}"); - err - })?; - log::info!("Commitment initial params: difficulty {}, global nonce {}, init_timestamp {}, epoch_duration {}, current_epoch {}", init_params.difficulty, init_params.global_nonce, init_params.init_timestamp, init_params.epoch_duration, init_params.current_epoch); - let ws_client = WsClientBuilder::default() .build(&listener_config.ws_endpoint) .await @@ -164,7 +157,6 @@ async fn setup_listener( host_id, connector, core_manager, - init_params, ws_client, ccp_client, cc_events_dir,