Skip to content

Commit

Permalink
fix(listener): save pending units on start; refresh state on subs drop (
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Mar 10, 2024
1 parent 4820daa commit 86da267
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 102 deletions.
218 changes: 126 additions & 92 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use chain_connector::{CCInitParams, ChainConnector, ConnectorError};
use chain_connector::{ChainConnector, ConnectorError};
use chain_data::{parse_log, peer_id_to_hex, ChainData, Log};
use chain_types::{
CommitmentId, CommitmentStatus, ComputeUnit, DealStatus, PendingUnit, COMMITMENT_IS_NOT_ACTIVE,
Expand All @@ -43,7 +43,7 @@ use crate::event::{
};
use crate::persistence;

const PROOF_POLL_LIMIT: usize = 50;
const PROOF_POLL_LIMIT: usize = 10;

pub struct ChainListener {
config: ChainConfig,
Expand Down Expand Up @@ -98,7 +98,6 @@ impl ChainListener {
host_id: PeerId,
chain_connector: Arc<ChainConnector>,
core_manager: Arc<CoreManager>,
init_params: CCInitParams,
ws_client: WsClient,
ccp_client: Option<CCPRpcHttpClient>,
persisted_proof_id_dir: PathBuf,
Expand All @@ -112,11 +111,11 @@ impl ChainListener {
ws_client,
config: chain_config,
host_id,
difficulty: init_params.difficulty,
init_timestamp: init_params.init_timestamp,
global_nonce: init_params.global_nonce,
current_epoch: init_params.current_epoch,
epoch_duration: init_params.epoch_duration,
difficulty: Difficulty::default(),
init_timestamp: U256::zero(),
global_nonce: GlobalNonce::new([0; 32]).into(),
current_epoch: U256::zero(),
epoch_duration: U256::zero(),
current_commitment: None,
active_compute_units: BTreeSet::new(),
pending_compute_units: BTreeSet::new(),
Expand All @@ -134,14 +133,17 @@ impl ChainListener {
}
}

async fn get_current_commitment_id(&self) -> eyre::Result<Option<CommitmentId>> {
async fn refresh_current_commitment_id(&mut self) -> eyre::Result<()> {
match self.chain_connector.get_current_commitment_id().await {
Ok(id) => Ok(id),
Ok(id) => {
self.current_commitment = id;
Ok(())
}
Err(err) => match err {
ConnectorError::RpcCallError { ref data, .. } => {
if data.contains(PEER_NOT_EXISTS) {
tracing::info!("Peer doesn't exist on chain. Waiting for market offer");
Ok(None)
Ok(())
} else {
tracing::error!(target: "chain-listener", "Failed to get current commitment id: {err}");
Err(err.into())
Expand All @@ -155,11 +157,32 @@ impl ChainListener {
}
}

async fn refresh_compute_units(&mut self) -> eyre::Result<()> {
async fn refresh_commitment_params(&mut self) -> eyre::Result<()> {
let init_params =
self.chain_connector
.get_cc_init_params()
.await
.map_err(|err| {
tracing::info!(target: "chain-listener", "Error getting Commitment initial params: {err}");
err
})?;

self.difficulty = init_params.difficulty;
self.init_timestamp = init_params.init_timestamp;
self.global_nonce = init_params.global_nonce;
self.epoch_duration = init_params.epoch_duration;
self.current_epoch = init_params.current_epoch;

tracing::info!(target: "chain-listener","Commitment initial params: difficulty {}, global nonce {}, init_timestamp {}, epoch_duration {}, current_epoch {}", init_params.difficulty, init_params.global_nonce, init_params.init_timestamp, init_params.epoch_duration, init_params.current_epoch);
Ok(())
}

async fn refresh_state(&mut self) -> eyre::Result<()> {
loop {
let result: eyre::Result<()> = try {
let (active, pending) = self.get_compute_units().await?;
self.current_commitment = self.get_current_commitment_id().await?;
self.refresh_commitment_params().await?;
self.refresh_compute_units().await?;
self.refresh_current_commitment_id().await?;

if let Some(ref c) = self.current_commitment {
tracing::info!(target: "chain-listener", "Current commitment id: {}", c);
Expand All @@ -177,8 +200,6 @@ impl ChainListener {

match status {
CommitmentStatus::Active => {
self.active_compute_units.extend(active);
self.pending_compute_units.extend(pending);
self.refresh_commitment().await?;
}

Expand All @@ -194,6 +215,7 @@ impl ChainListener {
}
}

self.load_proof_id().await?;
()
};

Expand Down Expand Up @@ -311,8 +333,7 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Subscribed successfully");

let setup: eyre::Result<()> = try {
self.load_proof_id().await?;
self.refresh_compute_units().await?;
self.refresh_state().await?;
};
if let Err(err) = setup {
tracing::error!(target: "chain-listener", "ChainListener: compute units refresh error: {err}");
Expand All @@ -324,80 +345,75 @@ impl ChainListener {
loop {
tokio::select! {
event = poll_subscription(&mut self.heads) => {
if let Some(header) = event {
if let Err(err) = self.process_new_header(header).await {
tracing::error!(target: "chain-listener", "newHeads event processing error: {err}");
if let Err(err) = self.subscribe_new_heads().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping...");
exit(1);
}
}
} else {
if let Err(err) = self.subscribe_new_heads().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping...");
exit(1);
if let Err(err) = self.process_new_header(event).await {
tracing::error!(target: "chain-listener", "newHeads event processing error: {err}");

let result: eyre::Result<()> = try {
self.refresh_state().await?;
self.subscribe_new_heads().await?;
};

if let Err(err) = result {
tracing::error!(target: "chain-listener", "Failed to resubscribe to newHeads: {err}; Stopping...");
exit(1);
}
}
},
event = poll_subscription(&mut self.commitment_activated) => {
if let Some(cc) = event {
if let Err(err) = self.process_commitment_activated(cc).await {
tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}");
if let Err(err) = self.subscribe_cc_activated().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping...");
exit(1);
}
}
} else {
if let Err(err) = self.subscribe_cc_activated().await {
if let Err(err) = self.process_commitment_activated(event).await {
tracing::error!(target: "chain-listener", "CommitmentActivated event processing error: {err}");

let result: eyre::Result<()> = try {
self.refresh_state().await?;
self.subscribe_cc_activated().await?;
};
if let Err(err) = result {
tracing::error!(target: "chain-listener", "Failed to resubscribe to CommitmentActivated event: {err}; Stopping...");
exit(1);
}
}
},
event = poll_subscription(&mut self.unit_activated) => {
if let Some(event) = event {
if self.unit_activated.is_some() {
if let Err(err) = self.process_unit_activated(event).await {
tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}");
if let Err(err) = self.subscribe_unit_activated().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping...");
tracing::error!(target: "chain-listener", "UnitActivated event processing error: {err}");

let result: eyre::Result<()> = try {
self.refresh_state().await?;
self.subscribe_unit_activated().await?;
};
if let Err(err) = result {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated event: {err}; Stopping...");
exit(1);
}
}
} else {
if let Err(err) = self.subscribe_unit_activated().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitActivated: {err}; Stopping...");
exit(1);
}
}
},
event = poll_subscription(&mut self.unit_deactivated) => {
if let Some(event) = event {
if let Err(err) = self.process_unit_deactivated(event).await {
if self.unit_deactivated.is_some() {
if let Err(err) = self.process_unit_deactivated(event).await {
tracing::error!(target: "chain-listener", "UnitDeactivated event processing error: {err}");
if let Err(err) = self.subscribe_unit_deactivated().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping...");

let result: eyre::Result<()> = try {
self.refresh_state().await?;
self.subscribe_unit_deactivated().await?;
};
if let Err(err) = result {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated event: {err}; Stopping...");
exit(1);
}
}
} else {
if let Err(err) = self.subscribe_unit_deactivated().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to UnitDeactivated: {err}; Stopping...");
exit(1);
}
}
},
event = poll_subscription(&mut self.unit_matched) => {
if let Some(event) = event {
if let Err(err) = self.process_deal_matched(event) {
tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}");
if let Err(err) = self.subscribe_deal_matched().await {
tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping...");
exit(1);
}
}
} else {
if let Err(err) = self.subscribe_deal_matched().await {
if let Err(err) = self.process_deal_matched(event) {
tracing::error!(target: "chain-listener", "DealMatched event processing error: {err}");

let result: eyre::Result<()> = try {
self.refresh_state().await?;
self.subscribe_deal_matched().await?;
};
if let Err(err) = result {
tracing::error!(target: "chain-listener", "Failed to resubscribe to DealMatched: {err}; Stopping...");
exit(1);
}
Expand Down Expand Up @@ -439,7 +455,7 @@ impl ChainListener {
}

/// Returns active and pending compute units
async fn get_compute_units(&mut self) -> eyre::Result<(Vec<CUID>, Vec<PendingUnit>)> {
async fn refresh_compute_units(&mut self) -> eyre::Result<()> {
let mut units = self.chain_connector.get_compute_units().await?;

let in_deal: Vec<_> = units.extract_if(|cu| cu.deal.is_some()).collect();
Expand All @@ -448,43 +464,44 @@ impl ChainListener {
.into_iter()
.partition(|unit| unit.start_epoch <= self.current_epoch);

let active: Vec<_> = active.into_iter().map(|unit| unit.id).collect();
let pending: Vec<PendingUnit> = pending.into_iter().map(PendingUnit::from).collect();
self.active_compute_units
.extend(active.into_iter().map(|unit| unit.id));
self.pending_compute_units
.extend(pending.into_iter().map(PendingUnit::from));

for cu in in_deal {
if let Some(deal) = cu.deal {
self.active_deals.insert(deal, cu.id);
}
}

tracing::info!(target: "chain-listener",
"Compute units mapping: active {}, pending {}, in deal {}",
active.len(),
pending.len(),
in_deal.len()
self.active_compute_units.len(),
self.pending_compute_units.len(),
self.active_deals.len()
);

// TODO: log compute units pretty
tracing::info!(target: "chain-listener",
"Active compute units: {:?}",
active.iter().map(CUID::to_string).collect::<Vec<_>>()
self.active_compute_units.iter().map(CUID::to_string).collect::<Vec<_>>()
);
tracing::info!(target: "chain-listener",
"Pending compute units: {:?}",
pending
self.pending_compute_units
.iter()
.map(|cu| cu.id.to_string())
.collect::<Vec<_>>()
);
tracing::info!(target: "chain-listener",
"In deal compute units: {:?}",
in_deal
.iter()
.map(|cu| cu.id.to_string())
self.active_deals.values()
.map(CUID::to_string)
.collect::<Vec<_>>()
);

for cu in in_deal {
if let Some(deal) = cu.deal {
self.active_deals.insert(deal, cu.id);
}
}

Ok((active, pending))
Ok(())
}

async fn subscribe_new_heads(&mut self) -> eyre::Result<()> {
Expand Down Expand Up @@ -602,9 +619,14 @@ impl ChainListener {
Ok(())
}

async fn process_new_header(&mut self, event: Result<Value, Error>) -> eyre::Result<()> {
async fn process_new_header(
&mut self,
event: Option<Result<Value, Error>>,
) -> eyre::Result<()> {
let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?;

// TODO: add block_number to metrics
let (block_timestamp, _block_number) = Self::parse_block_header(event?)?;
let (block_timestamp, _block_number) = Self::parse_block_header(header?)?;

// `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration`
let epoch_number =
Expand Down Expand Up @@ -650,8 +672,11 @@ impl ChainListener {

async fn process_commitment_activated(
&mut self,
event: Result<Log, Error>,
event: Option<Result<Log, Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!(
"Failed to process CommitmentActivated event: got None"
))?;
let cc_event = parse_log::<CommitmentActivatedData, CommitmentActivated>(event?)?;
let unit_ids = cc_event.info.unit_ids;
tracing::info!(target: "chain-listener",
Expand Down Expand Up @@ -685,7 +710,11 @@ impl ChainListener {
Ok(())
}

async fn process_unit_activated(&mut self, event: Result<Log, Error>) -> eyre::Result<()> {
async fn process_unit_activated(
&mut self,
event: Option<Result<Log, Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitActivated event: got None"))?;
let unit_event = parse_log::<UnitActivatedData, UnitActivated>(event?)?;
tracing::info!(target: "chain-listener",
"Received UnitActivated event for unit: {}, startEpoch: {}",
Expand All @@ -706,8 +735,9 @@ impl ChainListener {
/// Unit goes to Deal
async fn process_unit_deactivated(
&mut self,
event: Result<Log, client::Error>,
event: Option<Result<Log, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitDeactivated event: got None"))?;
let unit_event = parse_log::<UnitDeactivatedData, UnitDeactivated>(event?)?;

tracing::info!(target: "chain-listener",
Expand All @@ -722,7 +752,11 @@ impl ChainListener {
Ok(())
}

pub fn process_deal_matched(&mut self, event: Result<Log, client::Error>) -> eyre::Result<()> {
pub fn process_deal_matched(
&mut self,
event: Option<Result<Log, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process DealMatched event: got None"))?;
let deal_event = parse_log::<DealMatchedData, DealMatched>(event?)?;
tracing::info!(target: "chain-listener",
"Received DealMatched event for deal: {}",
Expand Down
Loading

0 comments on commit 86da267

Please sign in to comment.