Skip to content

Commit

Permalink
fix(listener): listener fixes (#2249)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Jun 5, 2024
1 parent 6eb2d57 commit b69f1cb
Showing 1 changed file with 62 additions and 29 deletions.
91 changes: 62 additions & 29 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ impl ChainListener {
}
tracing::info!(target: "chain-listener", "Subscribed successfully");

// Proof id should be loaded once on start, there is no reason to update it on refresh
// TODO: associate proof id with nonce, not current epoch
if let Err(err) = self.load_proof_id().await {
tracing::error!(target: "chain-listener", "Failed to load persisted proof id: {err}; Stopping...");
exit(1);
}

if let Err(err) = self.refresh_state().await {
tracing::error!(target: "chain-listener", "Failed to refresh state: {err}; Stopping...");
exit(1);
Expand Down Expand Up @@ -308,12 +315,13 @@ impl ChainListener {

self.difficulty = init_params.difficulty;
self.init_timestamp = init_params.init_timestamp;
self.global_nonce = init_params.global_nonce;

self.epoch_duration = init_params.epoch_duration;
self.min_proofs_per_epoch = init_params.min_proofs_per_epoch;
self.max_proofs_per_epoch = init_params.max_proofs_per_epoch;

self.set_current_epoch(init_params.current_epoch);
self.set_global_nonce(init_params.global_nonce).await?;

Ok(())
}
Expand Down Expand Up @@ -360,12 +368,10 @@ impl ChainListener {
_ => {}
}
}

self.load_proof_id().await?;
};

if let Err(e) = result {
tracing::warn!(target: "chain-listener", "Failed to refresh compute units: {e}");
tracing::warn!(target: "chain-listener", "Failed to refresh state: {e}");
tracing::info!(target: "chain-listener", "Retrying in 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await;
} else {
Expand All @@ -390,7 +396,7 @@ impl ChainListener {
let write = retry(backoff, || async {
persistence::persist_proof_id(
&self.persisted_proof_id_dir,
self.last_submitted_proof_id,
proof_id,
self.current_epoch,
).await.map_err(|err|{
tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Retrying...");
Expand All @@ -399,12 +405,13 @@ impl ChainListener {
Ok(())
}).await;

self.last_submitted_proof_id = proof_id;

if let Err(err) = write {
tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Ignoring..");
}

self.last_submitted_proof_id = proof_id;
tracing::info!(target: "chain-listener", "Persisted proof id {proof_id} on epoch {}", self.current_epoch);
tracing::info!(target: "chain-listener", "Persisted proof id {} on epoch {}", self.last_submitted_proof_id, self.current_epoch);
Ok(())
}

Expand All @@ -415,13 +422,9 @@ impl ChainListener {
if let Some(persisted_proof_id) = persisted_proof_id {
self.last_submitted_proof_id = persisted_proof_id.proof_id;
tracing::info!(target: "chain-listener", "Loaded persisted proof id {} saved on epoch {}", persisted_proof_id.proof_id, persisted_proof_id.epoch);
if persisted_proof_id.epoch != self.current_epoch {
tracing::info!(target: "chain-listener","Persisted proof id epoch is different from current epoch {}, resetting proof id", self.current_epoch);
self.reset_proof_id().await?;
}
} else {
tracing::info!(target: "chain-listener","No persisted proof id found, starting from zero");
self.reset_proof_id().await?;
tracing::info!(target: "chain-listener", "No persisted proof id found, starting from zero");
self.last_submitted_proof_id = ProofIdx::zero();
}

Ok(())
Expand Down Expand Up @@ -559,27 +562,33 @@ impl ChainListener {

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

self.cc_compute_units
.extend(units.into_iter().map(|unit| (CUID::new(unit.id.0), unit)));

for cu in in_deal {
self.active_deals
.insert(cu.deal.to_string().into(), CUID::new(cu.id.0));
}
self.cc_compute_units = units
.into_iter()
.map(|unit| (CUID::new(unit.id.0), unit))
.collect();

let active = self
.cc_compute_units
.values()
.filter(|unit| unit.startEpoch <= self.current_epoch);

let pending = self
.cc_compute_units
.values()
.filter(|unit| unit.startEpoch > self.current_epoch);

for cu in &in_deal {
let cu_id = CUID::new(cu.id.0);
// TODO: in the future it should be BTreeMap<DealId, Vec<CUID>>, because deal will be able
// to use multiple CUs from one peer
self.active_deals.insert(cu.deal.to_string().into(), cu_id);
}

tracing::info!(target: "chain-listener",
"Compute units mapping: in cc {}/[{} pending], in deal {}",
self.cc_compute_units.len(),
pending.clone().count(),
self.active_deals.len()
in_deal.len()
);

tracing::info!(target: "chain-listener",
Expand All @@ -599,6 +608,12 @@ impl ChainListener {
.collect::<Vec<_>>()
);

// NOTE: cores are released after all the logs to simplify debug on failure
for cu_id in self.active_deals.values() {
self.core_distributor.release_worker_cores(&[*cu_id]);
self.acquire_core_for_deal(*cu_id)?;
}

Ok(())
}

Expand Down Expand Up @@ -677,15 +692,10 @@ impl ChainListener {
if epoch_changed {
// TODO: add epoch_number to metrics

// nonce changes every epoch
self.global_nonce = self.chain_connector.get_global_nonce().await?;
tracing::info!(target: "chain-listener",
"New global nonce: {}",
self.global_nonce
);

self.set_current_epoch(epoch_number);
self.reset_proof_id().await?;
self.set_global_nonce(self.chain_connector.get_global_nonce().await?)
.await?;
tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce);

if let Some(status) = self.get_commitment_status().await? {
tracing::info!(target: "chain-listener", "Current commitment status: {status:?}");
Expand Down Expand Up @@ -864,6 +874,12 @@ impl ChainListener {
return Ok(());
}

if self.active_units_count() == 0 {
tracing::info!(target: "chain-listener", "No active units found in this epoch {}", self.current_epoch);
self.stop_commitment().await?;
return Ok(());
}

tracing::info!(target: "chain-listener",
"Refreshing commitment, active compute units: {}",
self.cc_compute_units
Expand Down Expand Up @@ -1351,6 +1367,16 @@ impl ChainListener {
}
}

async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> eyre::Result<()> {
if self.global_nonce != global_nonce {
tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce);
self.global_nonce = global_nonce;
self.reset_proof_id().await?;
}

Ok(())
}

fn observe<F>(&self, f: F)
where
F: FnOnce(&ChainListenerMetrics),
Expand All @@ -1359,6 +1385,13 @@ impl ChainListener {
f(metrics);
}
}

fn active_units_count(&self) -> usize {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch)
.count()
}
}

struct CUGroups {
Expand Down

0 comments on commit b69f1cb

Please sign in to comment.