Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chain-listener): exit expired deals #2143

Merged
merged 13 commits into from
Mar 9, 2024
Merged
16 changes: 10 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ members = [
"crates/chain-types",
"crates/types",
"crates/core-manager",
]
]
exclude = [
"nox/tests/tetraplets",
]
Expand Down
1 change: 1 addition & 0 deletions crates/chain-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = { workspace = true }
ccp-shared = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

[dev-dependencies]
mockito = { workspace = true }
59 changes: 53 additions & 6 deletions crates/chain-connector/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use ccp_shared::proof::CCProof;
use ccp_shared::types::{Difficulty, GlobalNonce};
use ccp_shared::types::{Difficulty, GlobalNonce, CUID};
use clarity::Transaction;
use ethabi::ethereum_types::U256;
use ethabi::Token;
Expand All @@ -18,19 +18,23 @@ use tokio::sync::Mutex;

use chain_data::ChainDataError::InvalidTokenSize;
use chain_data::{next_opt, parse_chain_data, peer_id_to_bytes, ChainFunction};
use chain_types::{Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit};
use chain_types::{
Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit, DealStatus,
};
use fluence_libp2p::PeerId;
use particle_args::{Args, JError};
use particle_builtins::{wrap, CustomService};
use particle_execution::{ParticleParams, ServiceFunction};
use server_config::ChainConfig;
use types::DealId;

use crate::error::{process_response, ConnectorError};
use crate::function::{GetCommitmentFunction, GetStatusFunction, SubmitProofFunction};
use crate::function::{GetCommitmentFunction, GetCommitmentStatusFunction, SubmitProofFunction};
use crate::ConnectorError::InvalidBaseFeePerGas;
use crate::{
CurrentEpochFunction, DifficultyFunction, EpochDurationFunction, GetComputePeerFunction,
GetComputeUnitsFunction, GetGlobalNonceFunction, InitTimestampFunction,
GetComputeUnitsFunction, GetDealStatusFunction, GetGlobalNonceFunction, InitTimestampFunction,
ReturnComputeUnitFromDeal,
};

const BASE_FEE_MULTIPLIER: f64 = 0.125;
Expand Down Expand Up @@ -166,6 +170,7 @@ impl ChainConnector {

pub async fn send_tx(&self, data: Vec<u8>, to: &str) -> Result<String, ConnectorError> {
let base_fee_per_gas = self.get_base_fee_per_gas().await?;
tracing::info!(target: "chain-connector", "Estimating gas for tx from {} to {} data {}", self.config.wallet_key.to_address(), to, hex::encode(&data));
let gas_limit = self.estimate_gas_limit(&data, to).await?;
let max_priority_fee_per_gas = self.max_priority_fee_per_gas().await?;

Expand Down Expand Up @@ -236,7 +241,7 @@ impl ChainConnector {
&self,
commitment_id: CommitmentId,
) -> Result<CommitmentStatus, ConnectorError> {
let data = GetStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?;
let data = GetCommitmentStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?;
let resp: String = process_response(
self.client
.request(
Expand Down Expand Up @@ -327,7 +332,7 @@ impl ChainConnector {
.await,
)?;
let mut tokens =
parse_chain_data(&resp, &GetComputeUnitsFunction::signature())?.into_iter();
parse_chain_data(&resp, &GetComputeUnitsFunction::result_signature())?.into_iter();
let units = next_opt(&mut tokens, "units", Token::into_array)?.into_iter();
let compute_units = units
.map(ComputeUnit::from_token)
Expand Down Expand Up @@ -391,6 +396,48 @@ impl ChainConnector {
})
}

pub async fn get_deal_statuses<'a, I>(
&self,
deal_ids: I,
) -> Result<Vec<Result<DealStatus, ConnectorError>>, ConnectorError>
where
I: Iterator<Item = &'a DealId>,
{
let mut batch = BatchRequestBuilder::new();
for deal_id in deal_ids {
let data = GetDealStatusFunction::data(&[])?;
batch.insert(
"eth_call",
rpc_params![
json!({
"data": data,
"to": deal_id.to_address(),
}),
"latest"
],
)?;
}

let resp: BatchResponse<String> = self.client.batch_request(batch).await?;
let mut statuses = vec![];

for status in resp.into_iter() {
let status = status
.map(|r| DealStatus::from(&r).map_err(ConnectorError::ParseChainDataFailed))
.map_err(|e| ConnectorError::RpcError(e.to_owned().into()))?;
statuses.push(status);
}

Ok(statuses)
}

pub async fn exit_deal(&self, cu_id: &CUID) -> Result<String, ConnectorError> {
let data =
ReturnComputeUnitFromDeal::data_bytes(&[Token::FixedBytes(cu_id.as_ref().to_vec())])?;
self.send_tx(data, &self.config.market_contract_address)
.await
}

fn difficulty_params(&self) -> eyre::Result<ArrayParams> {
let data = DifficultyFunction::data(&[])?;
Ok(rpc_params![
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/current_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ChainFunction for CurrentEpochFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/difficulty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl ChainFunction for DifficultyFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/epoch_duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl ChainFunction for EpochDurationFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_commitment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl ChainFunction for GetCommitmentFunction {
state_mutability: ethabi::StateMutability::View,
}
}
fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
Commitment::signature()
}
}
6 changes: 3 additions & 3 deletions crates/chain-connector/src/function/get_commitment_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use chain_data::ChainFunction;
/// @param commitmentId Commitment id
/// @return status commitment status
/// function getStatus(bytes32 commitmentId) external view returns (CCStatus);
pub struct GetStatusFunction;
pub struct GetCommitmentStatusFunction;

impl ChainFunction for GetStatusFunction {
impl ChainFunction for GetCommitmentStatusFunction {
fn function() -> ethabi::Function {
#[allow(deprecated)]
let function = ethabi::Function {
Expand All @@ -23,7 +23,7 @@ impl ChainFunction for GetStatusFunction {
function
}

fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
vec![ethabi::ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_compute_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl ChainFunction for GetComputePeerFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
ComputePeer::signature()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/get_compute_units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl ChainFunction for GetComputeUnitsFunction {
state_mutability: ethabi::StateMutability::View,
}
}
fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Array(Box::new(ParamType::Tuple(
ComputeUnit::signature(),
)))]
Expand Down
23 changes: 23 additions & 0 deletions crates/chain-connector/src/function/get_deal_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use chain_data::ChainFunction;
use ethabi::ParamType;

/// function getStatus() public view returns (Status)
pub struct GetDealStatusFunction;

impl ChainFunction for GetDealStatusFunction {
fn function() -> ethabi::Function {
#[allow(deprecated)]
let function = ethabi::Function {
name: "getStatus".to_string(),
inputs: vec![],
outputs: vec![],
constant: None,
state_mutability: ethabi::StateMutability::View,
};
function
}

fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/global_nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ChainFunction for GetGlobalNonceFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::FixedBytes(32)]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/init_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl ChainFunction for InitTimestampFunction {
}
}

fn signature() -> Vec<ParamType> {
fn result_signature() -> Vec<ParamType> {
vec![ParamType::Uint(256)]
}
}
6 changes: 5 additions & 1 deletion crates/chain-connector/src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ mod get_commitment;
mod get_commitment_status;
mod get_compute_peer;
mod get_compute_units;
mod get_deal_status;
mod global_nonce;
mod init_timestamp;
mod remove_cu_from_deal;
mod submit_proof;

pub use current_epoch::CurrentEpochFunction;
pub use difficulty::DifficultyFunction;
pub use epoch_duration::EpochDurationFunction;
pub use get_commitment::GetCommitmentFunction;
pub use get_commitment_status::GetStatusFunction;
pub use get_commitment_status::GetCommitmentStatusFunction;
pub use get_compute_peer::GetComputePeerFunction;
pub use get_compute_units::GetComputeUnitsFunction;
pub use get_deal_status::GetDealStatusFunction;
pub use global_nonce::GetGlobalNonceFunction;
pub use init_timestamp::InitTimestampFunction;
pub use remove_cu_from_deal::ReturnComputeUnitFromDeal;
pub use submit_proof::SubmitProofFunction;
27 changes: 27 additions & 0 deletions crates/chain-connector/src/function/remove_cu_from_deal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use chain_data::ChainFunction;
use ethabi::{Function, Param, ParamType, StateMutability};

/// @dev Return the compute unit from a deal
/// function returnComputeUnitFromDeal(bytes32 unitId) external;
pub struct ReturnComputeUnitFromDeal;

impl ChainFunction for ReturnComputeUnitFromDeal {
fn function() -> Function {
#[allow(deprecated)]
Function {
name: "returnComputeUnitFromDeal".to_string(),
inputs: vec![Param {
name: "unitId".to_string(),
kind: ParamType::FixedBytes(32),
internal_type: None,
}],
outputs: vec![],
constant: None,
state_mutability: StateMutability::NonPayable,
}
}

fn result_signature() -> Vec<ParamType> {
vec![]
}
}
2 changes: 1 addition & 1 deletion crates/chain-connector/src/function/submit_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ChainFunction for SubmitProofFunction {
function
}

fn signature() -> Vec<ethabi::ParamType> {
fn result_signature() -> Vec<ethabi::ParamType> {
vec![
ethabi::ParamType::FixedBytes(32),
ethabi::ParamType::FixedBytes(32),
Expand Down
8 changes: 4 additions & 4 deletions crates/chain-data/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ethabi::Token;

pub trait ChainFunction {
fn function() -> ethabi::Function;
fn signature() -> Vec<ethabi::ParamType>;
fn result_signature() -> Vec<ethabi::ParamType>;

fn data(inputs: &[Token]) -> Result<String, ChainDataError> {
let function = Self::function();
Expand All @@ -18,16 +18,16 @@ pub trait ChainFunction {
}

fn decode_uint(data: &str) -> Result<U256, ChainDataError> {
let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter();
let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter();
next_opt(&mut tokens, "uint", Token::into_uint)
}

fn decode_fixed_bytes(data: &str) -> Result<Vec<u8>, ChainDataError> {
let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter();
let mut tokens = crate::parse_chain_data(data, &Self::result_signature())?.into_iter();
next_opt(&mut tokens, "bytes", Token::into_fixed_bytes)
}

fn decode_tuple(data: &str) -> Result<Vec<Token>, ChainDataError> {
crate::parse_chain_data(data, &Self::signature())
crate::parse_chain_data(data, &Self::result_signature())
}
}
Loading
Loading