Skip to content

Commit

Permalink
feat(listener): use available cores to help other CUs (#2163)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Mar 13, 2024
1 parent 0ea65ef commit fe423ff
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 42 deletions.
15 changes: 9 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12"
serde_with = "3.6.0"
mockito = "1.2.0"
clarity = "1.3.0"
cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" }
ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" }
ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "main" }
cpu-utils = "0.7.0"
ccp-shared = "0.7.0"
ccp-rpc-client = "0.7.0"


[profile.dev]
Expand Down
124 changes: 91 additions & 33 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use std::time::Duration;

use backoff::future::retry;
use backoff::ExponentialBackoff;
use ccp_rpc_client::{CCPRpcHttpClient, OrHex};
use ccp_rpc_client::CCPRpcHttpClient;
use ccp_shared::proof::{CCProof, CCProofId, ProofIdx};
use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash};
use cpu_utils::PhysicalCoreId;
use ethabi::ethereum_types::U256;
use eyre::eyre;
use jsonrpsee::core::client::{Client as WsClient, Error, Subscription, SubscriptionClientT};
use jsonrpsee::core::client::{Client as WsClient, Subscription, SubscriptionClientT};
use jsonrpsee::core::{client, JsonValue};
use jsonrpsee::rpc_params;
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -66,19 +66,21 @@ pub struct ChainListener {
current_commitment: Option<CommitmentId>,

active_compute_units: BTreeSet<CUID>,
// Cores of pending units also run CCs but for other CUs
pending_compute_units: BTreeSet<PendingUnit>,

active_deals: BTreeMap<DealId, CUID>,

/// Resets every epoch
last_submitted_proof_id: ProofIdx,

persisted_proof_id_dir: PathBuf,

unit_activated: Option<Subscription<Log>>,
unit_deactivated: Option<Subscription<Log>>,
unit_activated: Option<Subscription<JsonValue>>,
unit_deactivated: Option<Subscription<JsonValue>>,
heads: Option<Subscription<JsonValue>>,
commitment_activated: Option<Subscription<Log>>,
unit_matched: Option<Subscription<Log>>,
commitment_activated: Option<Subscription<JsonValue>>,
unit_matched: Option<Subscription<JsonValue>>,
}

async fn poll_subscription<T>(s: &mut Option<Subscription<T>>) -> Option<Result<T, client::Error>>
Expand Down Expand Up @@ -621,7 +623,7 @@ impl ChainListener {

async fn process_new_header(
&mut self,
event: Option<Result<Value, Error>>,
event: Option<Result<Value, client::Error>>,
) -> eyre::Result<()> {
let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?;

Expand Down Expand Up @@ -672,12 +674,17 @@ impl ChainListener {

async fn process_commitment_activated(
&mut self,
event: Option<Result<Log, Error>>,
event: Option<Result<JsonValue, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!(
"Failed to process CommitmentActivated event: got None"
))?;
let cc_event = parse_log::<CommitmentActivatedData, CommitmentActivated>(event?)?;
))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to parse CommitmentActivated event: {err}, data: {event}");
err
})?;

let cc_event = parse_log::<CommitmentActivatedData, CommitmentActivated>(log)?;
let unit_ids = cc_event.info.unit_ids;
tracing::info!(target: "chain-listener",
"Received CommitmentActivated event for commitment: {}, startEpoch: {}, unitIds: {:?}",
Expand Down Expand Up @@ -712,10 +719,16 @@ impl ChainListener {

async fn process_unit_activated(
&mut self,
event: Option<Result<Log, Error>>,
event: Option<Result<JsonValue, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitActivated event: got None"))?;
let unit_event = parse_log::<UnitActivatedData, UnitActivated>(event?)?;
let event = event.ok_or(eyre!("Failed to process UnitActivated event: got None"))??;

let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to parse UnitActivated event: {err}, data: {event}");
err
})?;

let unit_event = parse_log::<UnitActivatedData, UnitActivated>(log)?;
tracing::info!(target: "chain-listener",
"Received UnitActivated event for unit: {}, startEpoch: {}",
unit_event.info.unit_id,
Expand All @@ -724,21 +737,26 @@ impl ChainListener {

if self.current_epoch >= unit_event.info.start_epoch {
self.active_compute_units.insert(unit_event.info.unit_id);
self.refresh_commitment().await?;
} else {
// Will be activated on the `start_epoch`
self.pending_compute_units.insert(unit_event.info.into());
}

self.refresh_commitment().await?;
Ok(())
}

/// Unit goes to Deal
async fn process_unit_deactivated(
&mut self,
event: Option<Result<Log, client::Error>>,
event: Option<Result<JsonValue, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process UnitDeactivated event: got None"))?;
let unit_event = parse_log::<UnitDeactivatedData, UnitDeactivated>(event?)?;
let event = event.ok_or(eyre!("Failed to process UnitDeactivated event: got None"))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to parse UnitDeactivated event: {err}, data: {event}");
err
})?;
let unit_event = parse_log::<UnitDeactivatedData, UnitDeactivated>(log)?;

tracing::info!(target: "chain-listener",
"Received UnitDeactivated event for unit: {}",
Expand All @@ -754,10 +772,14 @@ impl ChainListener {

pub fn process_deal_matched(
&mut self,
event: Option<Result<Log, client::Error>>,
event: Option<Result<JsonValue, client::Error>>,
) -> eyre::Result<()> {
let event = event.ok_or(eyre!("Failed to process DealMatched event: got None"))?;
let deal_event = parse_log::<DealMatchedData, DealMatched>(event?)?;
let event = event.ok_or(eyre!("Failed to process DealMatched event: got None"))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to parse DealMatched event: {err}, data: {event}");
err
})?;
let deal_event = parse_log::<DealMatchedData, DealMatched>(log)?;
tracing::info!(target: "chain-listener",
"Received DealMatched event for deal: {}",
deal_event.info.deal_id
Expand Down Expand Up @@ -786,11 +808,43 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce);
tracing::info!(target: "chain-listener", "Difficulty: {}", self.difficulty);
if let Some(ref ccp_client) = self.ccp_client {
let cores = self.acquire_active_units()?;
let mut cores = self.acquire_active_units()?;

// All pending units not involved in deals will help to solve CCs for other units
tracing::info!(target: "chain-listener",
"Pending compute units: {:?}",
self.pending_compute_units
.iter()
.map(|cu| cu.id.to_string())
.collect::<Vec<_>>()
);
let mut available_cores = self.get_available_cores()?;
tracing::info!(
target: "chain-listener",
"{} cores of pending units will be allocated to other units",
available_cores.len()
);

for unit in self.active_compute_units.iter().cycle() {
if let Some(core) = available_cores.pop_first() {
cores.insert(core, *unit);
} else {
break;
}
}

tracing::info!(target: "chain-listener",
"Sending commitment to CCP: global_nonce: {}, difficulty: {}, cores: {:?}",
self.global_nonce,
self.difficulty,
cores.iter().map(|(core, unit)| format!("{}: {}", core, unit.to_string()))
.collect::<Vec<_>>()
);

ccp_client
.on_active_commitment(
OrHex::from(self.global_nonce),
OrHex::from(self.difficulty),
self.global_nonce,
self.difficulty,
cores,
)
.await
Expand All @@ -802,7 +856,7 @@ impl ChainListener {
Ok(())
}

fn acquire_active_units(&self) -> eyre::Result<HashMap<PhysicalCoreId, OrHex<CUID>>> {
fn acquire_active_units(&self) -> eyre::Result<HashMap<PhysicalCoreId, CUID>> {
let cores = self
.core_manager
.acquire_worker_core(AcquireRequest::new(
Expand All @@ -817,12 +871,7 @@ impl ChainListener {
Ok(cores
.physical_core_ids
.into_iter()
.zip(
self.active_compute_units
.clone()
.into_iter()
.map(OrHex::Data),
)
.zip(self.active_compute_units.clone().into_iter())
.collect())
}

Expand All @@ -832,9 +881,11 @@ impl ChainListener {
Ok(())
}

/// Should be called only if Commitment is Inactive, Failed, Removed or not exists
async fn reset_commitment(&mut self) -> eyre::Result<()> {
self.active_compute_units.clear();
self.pending_compute_units.clear();
self.active_deals.clear();
self.current_commitment = None;
self.stop_commitment().await?;
Ok(())
Expand Down Expand Up @@ -893,7 +944,7 @@ impl ChainListener {
}

async fn poll_proofs(&mut self) -> eyre::Result<()> {
if self.current_commitment.is_none() {
if self.current_commitment.is_none() || self.active_compute_units.is_empty() {
return Ok(());
}

Expand Down Expand Up @@ -949,8 +1000,6 @@ impl ChainListener {
if data.contains(TOO_MANY_PROOFS) {
tracing::info!(target: "chain-listener", "Too many proofs found for compute unit {}, stopping until next epoch", proof.cu_id);

// TODO: acquire core for other units to help with proofs calculation

self.active_compute_units.remove(&proof.cu_id);
self.pending_compute_units
.insert(PendingUnit::new(proof.cu_id, self.current_epoch + 1));
Expand Down Expand Up @@ -1069,4 +1118,13 @@ impl ChainListener {
self.active_deals.remove(deal_id);
Ok(())
}
fn get_available_cores(&self) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
let available_units = self.pending_compute_units.iter().map(|cu| cu.id).collect();
self.core_manager.acquire_worker_core(AcquireRequest::new(available_units, WorkType::CapacityCommitment))
.map(|acquired| acquired.physical_core_ids)
.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to acquire cores for active units: {err}");
eyre::eyre!("Failed to acquire cores for active units: {err}")
})
}
}

0 comments on commit fe423ff

Please sign in to comment.