From ea4fe98848b857b529d6bdafdf13c08ef2dfecf8 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 14 Jun 2024 22:21:33 +0300 Subject: [PATCH 01/24] feat: implement batching --- Cargo.lock | 12 +- Cargo.toml | 6 +- crates/chain-connector/src/connector.rs | 180 ++++++--- .../chain-connector/src/function/capacity.rs | 10 + crates/chain-data/Cargo.toml | 2 + crates/chain-data/src/block_header.rs | 56 +++ crates/chain-data/src/lib.rs | 2 + crates/chain-listener/Cargo.toml | 1 + crates/chain-listener/src/lib.rs | 1 + crates/chain-listener/src/listener.rs | 369 ++++++++---------- crates/chain-listener/src/persistence.rs | 42 +- crates/chain-listener/src/proof_tracker.rs | 167 ++++++++ crates/server-config/src/node_config.rs | 5 + 13 files changed, 565 insertions(+), 288 deletions(-) create mode 100644 crates/chain-data/src/block_header.rs create mode 100644 crates/chain-listener/src/proof_tracker.rs diff --git a/Cargo.lock b/Cargo.lock index f82d141adf..dacb536d91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1260,8 +1260,7 @@ dependencies = [ [[package]] name = "ccp-rpc-client" version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9176fd2a8249730c6256bbbdb2f1f864ce1aa42caa80db9d137ce8dcce351ecd" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" dependencies = [ "ccp-shared", "hex", @@ -1272,8 +1271,7 @@ dependencies = [ [[package]] name = "ccp-shared" version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce20b2393397b733ff6f9fd91909dc7c115df934a3be876351e861851e4c2038" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" dependencies = [ "hex", "newtype_derive", @@ -1358,6 +1356,7 @@ dependencies = [ name = "chain-data" version = "0.1.0" dependencies = [ + "alloy-primitives", "alloy-sol-types", "const-hex", "ethabi", @@ -1367,6 +1366,7 @@ dependencies = [ "libp2p-identity", "log", "serde", + "serde_json", "thiserror", ] @@ -1398,6 +1398,7 @@ dependencies = [ "peer-metrics", "serde", "serde_json", + "serde_with 3.7.0", "server-config", "thiserror", "tokio", @@ -1728,8 +1729,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf88054aeeac7a790f0c653e77a79bd0af113b13249a8f94ad4ed795488fff64" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" dependencies = [ "ccp-shared", "ccp_core_affinity", diff --git a/Cargo.toml b/Cargo.toml index bb3f4806b0..a61c697d1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.7.0" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = "0.10.2" -ccp-shared = "0.10.2" -ccp-rpc-client = "0.10.2" +cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } alloy-sol-types = "0.6.4" alloy-primitives = "0.6.4" alloy_serde_macro = "0.1.2" diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index e6d2997b79..4f236731ad 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -22,8 +22,7 @@ use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; use std::sync::Arc; -use ccp_shared::proof::CCProof; -use ccp_shared::types::{Difficulty, GlobalNonce, CUID}; +use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID}; use clarity::{Transaction, Uint256}; use eyre::eyre; use futures::FutureExt; @@ -39,7 +38,7 @@ use tokio::sync::Mutex; use crate::ConnectorError::{InvalidU256, ResponseParseError}; use crate::Deal::CIDV1; use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer}; -use chain_data::peer_id_to_bytes; +use chain_data::{peer_id_to_bytes, BlockHeader}; use fluence_libp2p::PeerId; use hex_utils::decode_hex; use particle_args::{Args, JError}; @@ -65,7 +64,12 @@ pub trait ChainConnector: Send + Sync { async fn get_global_nonce(&self) -> Result; - async fn submit_proof(&self, proof: CCProof) -> Result; + async fn submit_proofs( + &self, + unit_ids: Vec, + local_nonces: Vec, + result_hashes: Vec, + ) -> Result; async fn get_deal_statuses(&self, deal_ids: Vec) -> Result>>; @@ -86,6 +90,7 @@ pub struct HttpChainConnector { pub struct CCInitParams { pub difficulty: Difficulty, pub init_timestamp: U256, + pub current_timestamp: U256, pub global_nonce: GlobalNonce, pub current_epoch: U256, pub epoch_duration: U256, @@ -599,53 +604,40 @@ impl ChainConnector for HttpChainConnector { batch.insert("eth_call", self.epoch_duration_params())?; batch.insert("eth_call", self.min_proofs_per_epoch_params())?; batch.insert("eth_call", self.max_proofs_per_epoch_params())?; + batch.insert("eth_getBlockByNumber", rpc_params!["latest", false])?; - let resp: BatchResponse = self.client.batch_request(batch).await?; + let resp: BatchResponse = self.client.batch_request(batch).await?; let mut results = resp .into_ok() .map_err(|err| eyre!("Some request failed in a batch {err:?}"))?; let difficulty: FixedBytes<32> = - FixedBytes::from_str(&results.next().ok_or(eyre!("No response for difficulty"))?)?; - let init_timestamp = U256::from_str( - &results - .next() - .ok_or(eyre!("No response for init_timestamp"))?, - )?; + FixedBytes::from_str(&parse_str_field(results.next(), "difficulty")?)?; - let global_nonce = FixedBytes::from_str( - &results - .next() - .ok_or(eyre!("No response for global_nonce"))?, - )?; + let init_timestamp = U256::from_str(&parse_str_field(results.next(), "init_timestamp")?)?; - let current_epoch = U256::from_str( - &results - .next() - .ok_or(eyre!("No response for current_epoch"))?, - )?; + let global_nonce = FixedBytes::from_str(&parse_str_field(results.next(), "global_nonce")?)?; - let epoch_duration = U256::from_str( - &results - .next() - .ok_or(eyre!("No response for epoch_duration"))?, - )?; + let current_epoch = U256::from_str(&parse_str_field(results.next(), "current_epoch")?)?; - let min_proofs_per_epoch = U256::from_str( - &results - .next() - .ok_or(eyre!("No response for min_proofs_per_epoch"))?, - )?; + let epoch_duration = U256::from_str(&parse_str_field(results.next(), "epoch_duration")?)?; - let max_proofs_per_epoch = U256::from_str( - &results + let min_proofs_per_epoch = + U256::from_str(&parse_str_field(results.next(), "min_proofs_per_epoch")?)?; + + let max_proofs_per_epoch = + U256::from_str(&parse_str_field(results.next(), "max_proofs_per_epoch")?)?; + + let header = BlockHeader::from_json( + results .next() - .ok_or(eyre!("No response for max_proofs_per_epoch"))?, + .ok_or_else(|| eyre!("Block header not found in response"))?, )?; Ok(CCInitParams { difficulty: Difficulty::new(difficulty.0), init_timestamp, + current_timestamp: header.timestamp, global_nonce: GlobalNonce::new(global_nonce.0), current_epoch, epoch_duration, @@ -719,11 +711,23 @@ impl ChainConnector for HttpChainConnector { Ok(GlobalNonce::new(bytes.0)) } - async fn submit_proof(&self, proof: CCProof) -> Result { - let data = Capacity::submitProofCall { - unitId: proof.cu_id.as_ref().into(), - localUnitNonce: proof.local_nonce.as_ref().into(), - resultHash: proof.result_hash.as_ref().into(), + async fn submit_proofs( + &self, + unit_ids: Vec, + local_nonces: Vec, + result_hashes: Vec, + ) -> Result { + let data = Capacity::submitProofsCall { + unitIds: unit_ids.into_iter().map(|id| id.as_ref().into()).collect(), + localUnitNonces: local_nonces + .into_iter() + .map(|n| n.as_ref().into()) + .collect(), + + resultHashes: result_hashes + .into_iter() + .map(|hash| hash.as_ref().into()) + .collect(), } .abi_encode(); @@ -806,6 +810,14 @@ impl ChainConnector for HttpChainConnector { } } +fn parse_str_field(value: Option, field: &str) -> eyre::Result { + value + .ok_or(eyre!("Field {} not found in response", field))? + .as_str() + .ok_or(eyre!("Field {} is not a string", field)) + .map(|s| s.to_string()) +} + #[cfg(test)] mod tests { @@ -815,7 +827,6 @@ mod tests { use std::str::FromStr; use std::sync::Arc; - use ccp_shared::proof::{CCProof, CCProofId, ProofIdx}; use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID}; use clarity::PrivateKey; use hex::FromHex; @@ -1007,7 +1018,57 @@ mod tests { "jsonrpc": "2.0", "result": "0x8", "id": 6 - } + }, + { + "jsonrpc": "2.0", + "result": { + "hash": "0x402c63844e7797c56468e5c9ca241d7f99b102c6e683fd371c1558fc87ff0963", + "parentHash": "0x4904bfa81f0c577da1caa89826c3cf05a952e51dd39226709ed643c0f3847992", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "miner": "0x5d159e79d541c35d68d1c8a5da02637fff779da0", + "stateRoot": "0x5f928ec52b4c7d259e0cc744f2be0e17952a94e721b732d03b91142ab23bd497", + "transactionsRoot": "0xc6d90adaa66f9e4e53d27c59c946f8f1b6aa9d1092a414c2290c05a4e081a8e5", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "number": "0x8b287", + "gasUsed": "0x8af3a5a", + "gasLimit": "0xadb3bb8", + "extraData": "0x", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "timestamp": "0x666c79b1", + "difficulty": "0x0", + "totalDifficulty": "0x0", + "sealFields": [], + "uncles": [], + "transactions": [ + { + "accessList": [], + "blockHash": "0x402c63844e7797c56468e5c9ca241d7f99b102c6e683fd371c1558fc87ff0963", + "blockNumber": "0x8b287", + "chainId": "0x8613d62c79827", + "from": "0xb3b172cc702e3ae32ce0c73a050037a7750e41a6", + "gas": "0xadb3bb8", + "gasPrice": "0x64", + "hash": "0xdd5a4b397f222a9432b3d592ecca96171860953c5d2630f5f1f9f3614fdd8cd5", + "input": "0x4ece5685649cea1e34fec8ae2f5f8195124ec9c9e268b333f51d38dd12c9c89b026dd66ea6f4997b286ae1873a26c2f8cac3980af7ac51b185c9754d3a116d8a3c3a02fc0001286ad1ddc35083d1773e4f451e27047b3cdb5a1ebc592fd521de7780eb4c", + "maxFeePerGas": "0x64", + "maxPriorityFeePerGas": "0x0", + "nonce": "0x34d6", + "r": "0xfceb7aaceb1dc56f1e0b33584361c17ba1d8d79a5ec378c81d789c83e5eb7016", + "s": "0x13ba30819306a8cb71c60261824a77e29a0571514ffbb3f08f8503303caec56d", + "to": "0x066d0e888b62b7c2571cf867d2b26d6afefac720", + "transactionIndex": "0x0", + "type": "0x2", + "v": "0x1", + "value": "0x0" + } + ], + "size": "0xf7", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "baseFeePerGas": "0x64" + }, + "id": 7 + } ]"#; let mut server = mockito::Server::new(); @@ -1089,17 +1150,13 @@ mod tests { }) .create(); - let proof = CCProof::new( - CCProofId::new( - GlobalNonce::new([0u8; 32]), - Difficulty::new([0u8; 32]), - ProofIdx::zero(), - ), - LocalNonce::new([0u8; 32]), - CUID::new([0u8; 32]), - ResultHash::from_slice([0u8; 32]), - ); - let result = get_connector(&url).submit_proof(proof).await; + let cu_ids = vec![CUID::new([0u8; 32])]; + let local_nonces = vec![LocalNonce::new([0u8; 32])]; + let result_hashes = vec![ResultHash::from_slice([0u8; 32])]; + + let result = get_connector(&url) + .submit_proofs(cu_ids, local_nonces, result_hashes) + .await; assert!(result.is_err()); @@ -1152,17 +1209,14 @@ mod tests { }) .create(); - let proof = CCProof::new( - CCProofId::new( - GlobalNonce::new([0u8; 32]), - Difficulty::new([0u8; 32]), - ProofIdx::zero(), - ), - LocalNonce::new([0u8; 32]), - CUID::new([0u8; 32]), - ResultHash::from_slice([0u8; 32]), - ); - let result = get_connector(&url).submit_proof(proof).await.unwrap(); + let cu_ids = vec![CUID::new([0u8; 32])]; + let local_nonces = vec![LocalNonce::new([0u8; 32])]; + let result_hashes = vec![ResultHash::from_slice([0u8; 32])]; + + let result = get_connector(&url) + .submit_proofs(cu_ids, local_nonces, result_hashes) + .await + .unwrap(); assert_eq!( result, diff --git a/crates/chain-connector/src/function/capacity.rs b/crates/chain-connector/src/function/capacity.rs index ce065c7e89..fba43e8855 100644 --- a/crates/chain-connector/src/function/capacity.rs +++ b/crates/chain-connector/src/function/capacity.rs @@ -53,6 +53,16 @@ sol! { /// @param localUnitNonce The local nonce of the unit for calculating the target hash. It's the proof /// @param resultHash The target hash of this proof function submitProof(bytes32 unitId, bytes32 localUnitNonce, bytes32 resultHash) external; + + /// @dev Submits proofs for the commitment + /// @param unitIds Compute unit ids which provide the proof + /// @param localUnitNonces Local nonces of the units for calculating the target hashes. It's the proof + /// @param resultHashes Target hashes of this proof + function submitProofs( + bytes32[] memory unitIds, + bytes32[] memory localUnitNonces, + bytes32[] memory resultHashes + ) external; } } diff --git a/crates/chain-data/Cargo.toml b/crates/chain-data/Cargo.toml index dba2d4446c..de49bebcec 100644 --- a/crates/chain-data/Cargo.toml +++ b/crates/chain-data/Cargo.toml @@ -16,3 +16,5 @@ thiserror = { workspace = true } eyre = { workspace = true } alloy-sol-types = { workspace = true } const-hex = { workspace = true } +alloy-primitives = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/chain-data/src/block_header.rs b/crates/chain-data/src/block_header.rs new file mode 100644 index 0000000000..92715e4bcb --- /dev/null +++ b/crates/chain-data/src/block_header.rs @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Fluence DAO + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use alloy_primitives::{BlockNumber, U256}; +use serde_json::Value; +use std::str::FromStr; + +#[derive(Debug)] +pub struct BlockHeader { + pub number: BlockNumber, + pub timestamp: U256, +} + +impl BlockHeader { + pub fn from_json(json: Value) -> eyre::Result { + let obj = json + .as_object() + .ok_or(eyre::eyre!("header is not an object; got {json}"))?; + + let timestamp = obj + .get("timestamp") + .and_then(Value::as_str) + .ok_or(eyre::eyre!(" timestamp field not found; got {json}"))? + .to_string(); + + let block_number = obj + .get("number") + .and_then(Value::as_str) + .ok_or(eyre::eyre!("number field not found; got {json}"))? + .to_string(); + + Ok(Self { + number: BlockNumber::from_str_radix(&block_number.trim_start_matches("0x"), 16)?, + timestamp: U256::from_str(×tamp)?, + }) + } + + pub fn from_str(json: &str) -> eyre::Result { + let json: Value = serde_json::from_str(json) + .map_err(|err| eyre::eyre!("failed to parse header {err}; got {json}"))?; + Self::from_json(json) + } +} diff --git a/crates/chain-data/src/lib.rs b/crates/chain-data/src/lib.rs index d00492d163..698dd82238 100644 --- a/crates/chain-data/src/lib.rs +++ b/crates/chain-data/src/lib.rs @@ -17,12 +17,14 @@ #![feature(try_blocks)] #![feature(slice_as_chunks)] +mod block_header; mod chain_data; mod data_tokens; mod error; mod log; mod utils; +pub use block_header::BlockHeader; pub use chain_data::{parse_chain_data, ChainData, ChainEvent, EventField}; pub use data_tokens::{next, next_opt}; pub use error::ChainDataError; diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml index ae8ef4f4f7..c931bd554c 100644 --- a/crates/chain-listener/Cargo.toml +++ b/crates/chain-listener/Cargo.toml @@ -40,6 +40,7 @@ tokio-stream = { workspace = true } toml_edit = { workspace = true } backoff = { version = "0.4.0", features = ["tokio", "futures"] } peer-metrics = { workspace = true } +serde_with = { workspace = true } [dev-dependencies] jsonrpsee = { workspace = true, features = ["server"] } diff --git a/crates/chain-listener/src/lib.rs b/crates/chain-listener/src/lib.rs index 6848ee86bd..7f7aae1f03 100644 --- a/crates/chain-listener/src/lib.rs +++ b/crates/chain-listener/src/lib.rs @@ -28,3 +28,4 @@ mod event; mod listener; mod persistence; +mod proof_tracker; diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 92f9e41872..38b96677ef 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -14,22 +14,21 @@ * limitations under the License. */ -use alloy_primitives::{Address, BlockNumber, FixedBytes, Uint, U256}; +use alloy_primitives::{Address, FixedBytes, Uint, U256, U64}; use alloy_sol_types::SolEvent; use backoff::Error::Permanent; +use std::cmp::min; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::future::{pending, Future}; -use std::ops::Add; use std::path::PathBuf; use std::process::exit; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use backoff::future::retry; use backoff::ExponentialBackoff; use ccp_rpc_client::CCPRpcHttpClient; -use ccp_shared::proof::{CCProof, CCProofId, ProofIdx}; +use ccp_shared::proof::BatchRequest; use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash}; use cpu_utils::PhysicalCoreId; @@ -52,7 +51,7 @@ use chain_connector::{ is_commitment_not_active, is_too_many_proofs, CCStatus, ChainConnector, CommitmentId, ConnectorError, Deal, PEER_NOT_EXISTS, }; -use chain_data::{parse_log, peer_id_to_hex, Log}; +use chain_data::{parse_log, peer_id_to_hex, BlockHeader, Log}; use core_distributor::errors::AcquireError; use core_distributor::types::{AcquireRequest, Assignment, WorkType}; use core_distributor::{CoreDistributor, CUID}; @@ -62,7 +61,7 @@ use types::DealId; use crate::event::cc_activated::CommitmentActivated; use crate::event::{ComputeUnitMatched, UnitActivated, UnitDeactivated}; -use crate::persistence; +use crate::proof_tracker::ProofTracker; const PROOF_POLL_LIMIT: usize = 50; @@ -91,8 +90,8 @@ pub struct ChainListener { // These settings are changed each epoch global_nonce: GlobalNonce, current_epoch: U256, + last_observed_block_timestamp: u64, - proof_counter: BTreeMap, current_commitment: Option, // the compute units that are in the commitment and not in deals @@ -100,10 +99,8 @@ pub struct ChainListener { // the compute units that are in deals and not in commitment active_deals: BTreeMap, - /// Resets every epoch - last_submitted_proof_id: ProofIdx, - pending_proof_txs: Vec<(String, CUID)>, - persisted_proof_id_dir: PathBuf, + proof_tracker: ProofTracker, + pending_proof_txs: Vec<(String, Vec)>, // TODO: move out to a separate struct, get rid of Option // Subscriptions that are polled when we have commitment @@ -156,14 +153,11 @@ impl ChainListener { epoch_duration: U256::ZERO, min_proofs_per_epoch: U256::ZERO, max_proofs_per_epoch: U256::ZERO, - proof_counter: BTreeMap::new(), current_commitment: None, cc_compute_units: BTreeMap::new(), core_distributor, ccp_client, - last_submitted_proof_id: ProofIdx::zero(), pending_proof_txs: vec![], - persisted_proof_id_dir, unit_activated: None, unit_deactivated: None, heads: None, @@ -171,6 +165,8 @@ impl ChainListener { unit_matched: None, active_deals: BTreeMap::new(), metrics, + proof_tracker: ProofTracker::new(persisted_proof_id_dir), + last_observed_block_timestamp: 0, } } @@ -206,9 +202,9 @@ impl ChainListener { tracing::info!(target: "chain-listener", "Subscribed successfully"); // Proof id should be loaded once on start, there is no reason to update it on refresh - // TODO: associate proof id with nonce, not current epoch - if let Err(err) = self.load_proof_id().await { - tracing::error!(target: "chain-listener", "Failed to load persisted proof id: {err}; Stopping..."); + + if let Err(err) = self.proof_tracker.load_state().await { + tracing::error!(target: "chain-listener", "Failed to load persisted proof tracker state: {err}; Stopping..."); exit(1); } @@ -256,9 +252,10 @@ impl ChainListener { if let Err(err) = self.poll_proofs().await { tracing::warn!(target: "chain-listener", "Failed to poll/submit proofs: {err}"); } - } else if let Err(err) = self.submit_mocked_proofs().await { - tracing::warn!(target: "chain-listener", "Failed to submit mocked proofs: {err}"); } + // } else if let Err(err) = self.submit_mocked_proofs().await { + // tracing::warn!(target: "chain-listener", "Failed to submit mocked proofs: {err}"); + // } if let Err(err) = self.poll_deal_statuses().await { @@ -320,8 +317,8 @@ impl ChainListener { self.min_proofs_per_epoch = init_params.min_proofs_per_epoch; self.max_proofs_per_epoch = init_params.max_proofs_per_epoch; - self.set_current_epoch(init_params.current_epoch); - self.set_global_nonce(init_params.global_nonce).await?; + self.set_current_epoch(init_params.current_epoch).await; + self.set_global_nonce(init_params.global_nonce).await; Ok(()) } @@ -382,54 +379,6 @@ impl ChainListener { Ok(()) } - async fn reset_proof_id(&mut self) -> eyre::Result<()> { - tracing::info!(target: "chain-listener", "Resetting proof id counter"); - self.set_proof_id(ProofIdx::zero()).await - } - - async fn set_proof_id(&mut self, proof_id: ProofIdx) -> eyre::Result<()> { - let backoff = ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs(3)), - ..ExponentialBackoff::default() - }; - - let write = retry(backoff, || async { - persistence::persist_proof_id( - &self.persisted_proof_id_dir, - proof_id, - self.current_epoch, - ).await.map_err(|err|{ - tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Retrying..."); - eyre!(err) - })?; - Ok(()) - }).await; - - self.last_submitted_proof_id = proof_id; - - if let Err(err) = write { - tracing::warn!(target: "chain-listener", "Failed to persist proof id: {err}; Ignoring.."); - } - - tracing::info!(target: "chain-listener", "Persisted proof id {} on epoch {}", self.last_submitted_proof_id, self.current_epoch); - Ok(()) - } - - async fn load_proof_id(&mut self) -> eyre::Result<()> { - let persisted_proof_id = - persistence::load_persisted_proof_id(&self.persisted_proof_id_dir).await?; - - if let Some(persisted_proof_id) = persisted_proof_id { - self.last_submitted_proof_id = persisted_proof_id.proof_id; - tracing::info!(target: "chain-listener", "Loaded persisted proof id {} saved on epoch {}", persisted_proof_id.proof_id, persisted_proof_id.epoch); - } else { - tracing::info!(target: "chain-listener", "No persisted proof id found, starting from zero"); - self.last_submitted_proof_id = ProofIdx::zero(); - } - - Ok(()) - } - // Allocate one CPU core for utility use async fn set_utility_core(&mut self) -> eyre::Result<()> { if let Some(ccp_client) = self.ccp_client.as_ref() { @@ -681,20 +630,24 @@ impl ChainListener { ) -> eyre::Result<()> { let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?; - let (block_timestamp, block_number) = Self::parse_block_header(header?)?; + let header = BlockHeader::from_json(header?)?; + let block_number = header.number; + let block_timestamp = header.timestamp; + + self.last_observed_block_timestamp = block_timestamp; self.observe(|m| m.observe_new_block(block_number)); // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` - let epoch_number = - U256::from(1) + (block_timestamp - self.init_timestamp) / self.epoch_duration; + let epoch_number = U256::from(1) + + (Uint::from(block_timestamp) - self.init_timestamp) / self.epoch_duration; let epoch_changed = epoch_number > self.current_epoch; if epoch_changed { // TODO: add epoch_number to metrics - self.set_current_epoch(epoch_number); + self.set_current_epoch(epoch_number).await; self.set_global_nonce(self.chain_connector.get_global_nonce().await?) - .await?; + .await; tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce); if let Some(status) = self.get_commitment_status().await? { @@ -847,10 +800,10 @@ impl ChainListener { let mut finished_units: Vec = Vec::new(); for (cuid, cu) in &self.cc_compute_units { if cu.startEpoch <= self.current_epoch { - let count = self.proof_counter.get(cuid).unwrap_or(&U256::ZERO); - if count < &self.min_proofs_per_epoch { + let count = self.proof_tracker.get_proof_counter(&cuid); + if count < self.min_proofs_per_epoch { priority_units.push(*cuid) - } else if *count >= self.max_proofs_per_epoch { + } else if count >= self.max_proofs_per_epoch { finished_units.push(*cuid) } else { non_priority_units.push(*cuid) @@ -1075,25 +1028,31 @@ impl ChainListener { Ok(()) } - /// Submit Mocked Proofs for all active compute units. - /// Mocked Proof has result_hash == difficulty and random local_nonce - async fn submit_mocked_proofs(&mut self) -> eyre::Result<()> { - if self.current_commitment.is_none() { - return Ok(()); - } - - let result_hash = ResultHash::from_slice(*self.difficulty.as_ref()); - - // proof_id is used only by CCP and is not sent to chain - let proof_id = CCProofId::new(self.global_nonce, self.difficulty, ProofIdx::zero()); - let units = self.cc_compute_units.keys().cloned().collect::>(); - for unit in units { - let local_nonce = LocalNonce::random(); - self.submit_proof(CCProof::new(proof_id, local_nonce, unit, result_hash)) - .await?; - } - - Ok(()) + // /// Submit Mocked Proofs for all active compute units. + // /// Mocked Proof has result_hash == difficulty and random local_nonce + // async fn submit_mocked_proofs(&mut self) -> eyre::Result<()> { + // if self.current_commitment.is_none() { + // return Ok(()); + // } + // + // let result_hash = ResultHash::from_slice(*self.difficulty.as_ref()); + // + // // proof_id is used only by CCP and is not sent to chain + // let proof_id = CCProofId::new(self.global_nonce, self.difficulty, ProofIdx::zero()); + // let units = self.cc_compute_units.keys().cloned().collect::>(); + // for unit in units { + // let local_nonce = LocalNonce::random(); + // self.submit_proofs(CCProof::new(proof_id, local_nonce, unit, result_hash)) + // .await?; + // } + // + // Ok(()) + // } + + fn is_epoch_ending(&self) -> bool { + let window = self.listener_config.epoch_end_window.to_secs(); + let next_epoch_start = self.init_timestamp + self.epoch_duration * (self.current_epoch + 1); + next_epoch_start - self.last_observed_block_timestamp < window } async fn poll_proofs(&mut self) -> eyre::Result<()> { @@ -1102,45 +1061,83 @@ impl ChainListener { } if let Some(ref ccp_client) = self.ccp_client { - tracing::trace!(target: "chain-listener", "Polling proofs after: {}", self.last_submitted_proof_id); - - let proofs = measured_request(&self.metrics, async { - ccp_client - .get_proofs_after(self.last_submitted_proof_id, PROOF_POLL_LIMIT) - .await - }) - .await?; - - // TODO: send only in batches - - // Filter proofs related to current epoch only - let proofs: Vec<_> = proofs - .into_iter() - .filter(|p| p.id.global_nonce == self.global_nonce) - .collect(); + let batch_requests = self.get_batch_request(); + + let proof_batches = if self.is_epoch_ending() { + let last_known_proofs = batch_requests + .into_iter() + .map(|cu_id, req| (cu_id, req.last_seen_proof_idx)) + .collect(); + + measured_request( + &self.metrics, + async { ccp_client.get_proofs_after(last_known_proofs, PROOF_POLL_LIMIT) } + .await, + ) + .await? + } else { + measured_request( + &self.metrics, + async { + ccp_client.get_batch_proofs_after( + batch_requests, + self.listener_config.min_batch_count, + self.listener_config.max_batch_count, + ) + } + .await, + ) + .await? + }; - if !proofs.is_empty() { - tracing::info!(target: "chain-listener", "Found {} proofs from polling", proofs.len()); + // TODO: maybe filter out proofs that are not related to current epoch + // // Filter proofs related to current epoch only + // let proof_batches: Vec = proofs + // .into_iter() + // .for_each(move |p| { + // p.proof_batches + // .into_iter() + // .filter(|p| p.id.global_nonce == self.global_nonce) + // .collect() + // }) + // .collect(); + + if !proof_batches.is_empty() { + let total_proofs = proof_batches + .iter() + .map(|p| p.proof_batches.len()) + .sum::(); + tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling", total_proofs, proof_batches.len()); } - for proof in proofs.into_iter() { - let id = proof.id.idx; - tracing::info!(target: "chain-listener", "Submitting proof: {id}"); - self.submit_proof(proof).await?; - self.set_proof_id(proof.id.idx).await?; + let mut unit_ids = Vec::new(); + let mut local_nonces = Vec::new(); + let mut result_hashes = Vec::new(); + for batch in proof_batches.into_iter() { + for proof in batch.proof_batches.into_iter() { + unit_ids.push(proof.cu_id); + local_nonces.push(proof.local_nonce); + result_hashes.push(proof.result_hash); + self.proof_tracker + .observe_proof(proof.cu_id, proof.id.idx) + .await; + } } + + self.submit_proofs(unit_ids, local_nonces, result_hashes) + .await?; } Ok(()) } - async fn submit_proof(&mut self, proof: CCProof) -> eyre::Result<()> { - // This happens if Unit moved to Deal and shortly after that (but before cc refresh) ccp found proof for it - if !self.cc_compute_units.contains_key(&proof.cu_id) { - return Ok(()); - } - + async fn submit_proofs( + &mut self, + unit_ids: Vec, + local_nonces: Vec, + result_hashes: Vec, + ) -> eyre::Result<()> { let submit = retry(ExponentialBackoff::default(), || async { - self.chain_connector.submit_proof(proof).await.map_err(|err| { + self.chain_connector.submit_proofs(unit_ids.clone(), local_nonces.clone(), result_hashes.clone()).await.map_err(|err| { match err { ConnectorError::RpcCallError { .. } => { Permanent(err) } _ => { @@ -1158,11 +1155,9 @@ impl ChainListener { ConnectorError::RpcCallError { ref data, .. } => { // TODO: track proofs count per epoch and stop at maxProofsPerEpoch if is_too_many_proofs(data) { - tracing::info!(target: "chain-listener", "Too many proofs found for compute unit {}", proof.cu_id); + tracing::info!(target: "chain-listener", "Too many proofs found for some compute unit" ); - self.proof_counter - .insert(proof.cu_id, self.max_proofs_per_epoch); - self.refresh_commitment().await?; + // NOTE: it should be removed from contracts Ok(()) } else if is_commitment_not_active(data) { @@ -1176,23 +1171,23 @@ impl ChainListener { Ok(()) } else { // TODO: catch more contract asserts like "Proof is not valid" and "Proof is bigger than difficulty" - tracing::error!(target: "chain-listener", "Failed to submit proof {err}"); - tracing::error!(target: "chain-listener", "Proof {:?} ", proof); + tracing::error!(target: "chain-listener", "Failed to submit proofs {err}"); + tracing::error!(target: "chain-listener", "Units {:?} nonces {:?} result hashes {:?}", unit_ids, local_nonces, result_hashes); // In case of contract errors we just skip these proofs and continue Ok(()) } } _ => { tracing::error!(target: "chain-listener", "Failed to submit proof: {err}"); - tracing::error!(target: "chain-listener", "Proof {:?} ", proof); + tracing::error!(target: "chain-listener", "Units {:?} nonces {:?} result hashes {:?}", unit_ids, local_nonces, result_hashes); self.observe(|m| m.observe_proof_failed()); Err(err.into()) } } } Ok(tx_id) => { - tracing::info!(target: "chain-listener", "Submitted proof {}, txHash: {tx_id}", proof.id.idx); - self.pending_proof_txs.push((tx_id, proof.cu_id)); + tracing::info!(target: "chain-listener", "Successfully submitted {} proofs, txHash: {tx_id}", unit_ids.len()); + self.pending_proof_txs.push((tx_id, unit_ids)); self.observe(|m| m.observe_proof_submitted()); Ok(()) @@ -1200,42 +1195,6 @@ impl ChainListener { } } - fn parse_block_number(block_number: &str) -> eyre::Result { - let block_number_ = block_number.strip_prefix("0x").ok_or(eyre::eyre!( - "newHeads: block number is not hex; got {block_number}" - ))?; - BlockNumber::from_str_radix(block_number_, 16).map_err(|err| { - eyre::eyre!("Failed to parse block number: {err}, block_number {block_number}") - }) - } - - fn parse_block_header(header: Value) -> eyre::Result<(U256, BlockNumber)> { - let obj = header.as_object().ok_or(eyre::eyre!( - "newHeads: header is not an object; got {header}" - ))?; - - let timestamp = obj - .get("timestamp") - .and_then(Value::as_str) - .ok_or(eyre::eyre!( - "newHeads: timestamp field not found; got {header}" - ))? - .to_string(); - - let block_number = header - .get("number") - .and_then(Value::as_str) - .ok_or(eyre::eyre!( - "newHeads: number field not found; got {header}" - ))? - .to_string(); - - Ok(( - U256::from_str(×tamp)?, - Self::parse_block_number(&block_number)?, - )) - } - async fn poll_deal_statuses(&mut self) -> eyre::Result<()> { if self.active_deals.is_empty() { return Ok(()); @@ -1310,7 +1269,7 @@ impl ChainListener { let mut refresh_neeeded = false; let mut stats_updated = false; - for (status, (tx_hash, cu_id)) in statuses + for (status, (tx_hash, cu_ids)) in statuses .into_iter() .zip(self.pending_proof_txs.clone().into_iter()) { @@ -1319,18 +1278,20 @@ impl ChainListener { if status { tracing::info!(target: "chain-listener", "Proof tx {tx_hash} confirmed"); stats_updated = true; - let counter = self - .proof_counter - .entry(cu_id) - .and_modify(|c| { - *c = c.add(Uint::from(1)); - }) - .or_insert(Uint::from(1)); - - if *counter >= self.min_proofs_per_epoch { - tracing::info!(target: "chain-listener", "Compute unit {cu_id} submitted enough proofs"); - // need to call refresh commitment to make some cores to help others - refresh_neeeded = true; + for cu_id in cu_ids { + let counter = self.proof_tracker.confirm_proof(cu_id).await; + + if counter == self.max_proofs_per_epoch { + tracing::info!(target: "chain-listener", "Compute unit {cu_id} submitted maximum proofs in the current epoch {}", self.current_epoch); + // need to call refresh commitment to make some cores to help others + refresh_neeeded = true; + } else { + if counter >= self.min_proofs_per_epoch { + tracing::info!(target: "chain-listener", "Compute unit {cu_id} submitted minimum proofs in the current epoch {}", self.current_epoch); + // need to call refresh commitment to make some cores to help others + refresh_neeeded = true; + } + } } self.observe(|m| m.observe_proof_tx_success()); } else { @@ -1354,28 +1315,20 @@ impl ChainListener { } if stats_updated { - tracing::info!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_counter.iter().map(|(cu, count)| format!("{}: {}", cu, count)).collect::>()); + tracing::debug!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_tracker.get_proof_counters().iter().map(|(cu, count)| format!("{}: {}", cu, count)).collect::>()); } Ok(()) } - fn set_current_epoch(&mut self, epoch_number: U256) { - if self.current_epoch != epoch_number { - tracing::info!(target: "chain-listener", "Epoch changed, was {}, new epoch number is {epoch_number}", self.current_epoch); - self.current_epoch = epoch_number; - self.proof_counter.clear(); - } + async fn set_current_epoch(&mut self, epoch_number: U256) { + self.current_epoch = epoch_number; + self.proof_tracker.set_current_epoch(epoch_number).await; } - async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> eyre::Result<()> { - if self.global_nonce != global_nonce { - tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce); - self.global_nonce = global_nonce; - self.reset_proof_id().await?; - } - - Ok(()) + async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) { + self.global_nonce = self.global_nonce; + self.proof_tracker.set_global_nonce(global_nonce).await; } fn observe(&self, f: F) @@ -1393,6 +1346,24 @@ impl ChainListener { .filter(|(_, cu)| cu.startEpoch <= self.current_epoch) .count() } + fn get_batch_request(&self) -> HashMap { + let mut batch_request = HashMap::new(); + for cu_id in self.cc_compute_units.keys() { + let sent_proofs_count = self.proof_tracker.get_proof_counter(cu_id); + let proofs_needed = + U64::from(self.max_proofs_per_epoch - sent_proofs_count).as_limbs()[0] as usize; + + if proofs_needed > 0 { + let request = BatchRequest { + last_seen_proof_idx: self.proof_tracker.get_last_submitted_proof_id(cu_id), + proof_batch_size: min(proofs_needed, self.listener_config.max_proof_batch_size), + }; + + batch_request.insert(*cu_id, request); + } + } + batch_request + } } struct CUGroups { diff --git a/crates/chain-listener/src/persistence.rs b/crates/chain-listener/src/persistence.rs index c1a77ddecb..f1b88407b6 100644 --- a/crates/chain-listener/src/persistence.rs +++ b/crates/chain-listener/src/persistence.rs @@ -16,14 +16,25 @@ use alloy_primitives::U256; use eyre::Context; +use hex_utils::serde_as::Hex; +use serde_with::serde_as; +use serde_with::DisplayFromStr; use std::path::Path; +use crate::proof_tracker::ProofTracker; use alloy_serde_macro::{U256_as_String, U256_from_String}; use ccp_shared::proof::ProofIdx; +use ccp_shared::types::{GlobalNonce, CUID}; use serde::{Deserialize, Serialize}; + +#[serde_as] #[derive(Serialize, Deserialize)] -pub struct PersistedProofId { - pub proof_id: ProofIdx, +pub struct PersistedProofTracker { + #[serde_as(as = "Vec<(Hex,_)>")] + pub proof_ids: Vec<(CUID, ProofIdx)>, + #[serde_as(as = "Vec<(Hex,DisplayFromStr)>")] + pub proof_counter: Vec<(CUID, U256)>, + pub global_nonce: GlobalNonce, #[serde( serialize_with = "U256_as_String", deserialize_with = "U256_from_String" @@ -32,35 +43,32 @@ pub struct PersistedProofId { } pub(crate) fn proof_id_filename() -> String { - "proof_id.toml".to_string() + "proof_tracker.toml".to_string() } -pub(crate) async fn persist_proof_id( +pub(crate) async fn persist_proof_tracker( proof_id_dir: &Path, - proof_id: ProofIdx, - current_epoch: U256, + proof_tracker: &ProofTracker, ) -> eyre::Result<()> { let path = proof_id_dir.join(proof_id_filename()); - let bytes = toml_edit::ser::to_vec(&PersistedProofId { - proof_id, - epoch: current_epoch, - }) - .map_err(|err| eyre::eyre!("Proof id serialization failed {err}"))?; + let bytes = toml_edit::ser::to_vec(&PersistedProofTracker::from(proof_tracker)) + .map_err(|err| eyre::eyre!("Proof id serialization failed {err}"))?; tokio::fs::write(&path, bytes) .await .context(format!("error writing proof id to {}", path.display())) } -pub(crate) async fn load_persisted_proof_id( +pub(crate) async fn load_persisted_proof_tracker( proof_id_dir: &Path, -) -> eyre::Result> { +) -> eyre::Result> { let path = proof_id_dir.join(proof_id_filename()); if path.exists() { - let bytes = tokio::fs::read(&path) - .await - .context(format!("error reading proof id from {}", path.display()))?; + let bytes = tokio::fs::read(&path).await.context(format!( + "error reading proof tracker state from {}", + path.display() + ))?; let persisted_proof = toml_edit::de::from_slice(&bytes).context(format!( - "error deserializing proof id from {}", + "error deserializing proof tracker state from {}", path.display() ))?; Ok(Some(persisted_proof)) diff --git a/crates/chain-listener/src/proof_tracker.rs b/crates/chain-listener/src/proof_tracker.rs new file mode 100644 index 0000000000..8671e846fa --- /dev/null +++ b/crates/chain-listener/src/proof_tracker.rs @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Fluence DAO + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::persistence; +use crate::persistence::PersistedProofTracker; +use alloy_primitives::{Uint, U256}; +use backoff::future::retry; +use backoff::ExponentialBackoff; +use ccp_shared::proof::ProofIdx; +use ccp_shared::types::{GlobalNonce, CUID}; +use eyre::eyre; +use std::cmp::max; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Add; +use std::path::PathBuf; +use std::time::Duration; + +pub struct ProofTracker { + persisted_proof_id_dir: PathBuf, + current_epoch: U256, + global_nonce: GlobalNonce, + proof_counter: BTreeMap, + last_submitted_proof_ids: HashMap, +} + +impl ProofTracker { + pub fn new(persisted_proof_id_dir: PathBuf) -> Self { + Self { + persisted_proof_id_dir, + current_epoch: U256::ZERO, + global_nonce: GlobalNonce::new([0; 32]), + proof_counter: BTreeMap::new(), + last_submitted_proof_ids: HashMap::new(), + } + } + + pub async fn load_state(&mut self) -> eyre::Result<()> { + let persisted_proof_tracker = + persistence::load_persisted_proof_tracker(&self.persisted_proof_id_dir).await?; + + if let Some(state) = persisted_proof_tracker { + tracing::info!(target: "chain-listener", "Loaded proof tracker state"); + self.proof_counter = state.proof_counter.into_iter().collect(); + self.last_submitted_proof_ids = state + .proof_ids + .into_iter() + .collect::>(); + self.global_nonce = state.global_nonce; + self.current_epoch = state.epoch; + } else { + tracing::info!(target: "chain-listener", "No persisted proof tracker state found") + } + + Ok(()) + } + + pub async fn observe_proof(&mut self, cu_id: CUID, proof_id: ProofIdx) { + self.last_submitted_proof_ids + .entry(cu_id) + .and_modify(|id| *id = max(*id, proof_id)) + .or_insert(proof_id); + + tracing::info!(target: "chain-listener", "Persisted proof id {} for {} on epoch {} nonce {}", proof_id, cu_id, self.current_epoch, self.global_nonce); + + self.persist().await; + } + + pub fn get_last_submitted_proof_id(&self, cu_id: &CUID) -> ProofIdx { + self.last_submitted_proof_ids + .get(cu_id) + .copied() + .unwrap_or(ProofIdx::zero()) + } + + pub async fn confirm_proof(&mut self, cu_id: CUID) -> U256 { + let proof_id = Uint::from( + *self + .proof_counter + .entry(cu_id) + .and_modify(|c| { + *c = c.add(Uint::from(1)); + }) + .or_insert(Uint::from(1)), + ); + self.persist().await; + + proof_id + } + + pub fn get_proof_counter(&self, cu_id: &CUID) -> U256 { + self.proof_counter.get(cu_id).copied().unwrap_or(Uint::ZERO) + } + + pub fn get_proof_counters(&self) -> BTreeMap { + self.proof_counter.clone() + } + pub async fn set_current_epoch(&mut self, epoch_number: U256) { + if self.current_epoch != epoch_number { + tracing::info!(target: "chain-listener", "Epoch changed, was {}, new epoch number is {epoch_number}", self.current_epoch); + self.current_epoch = epoch_number; + self.proof_counter.clear(); + self.persist().await; + } + } + + pub async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) { + if self.global_nonce != global_nonce { + tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce); + self.global_nonce = global_nonce; + tracing::info!(target: "chain-listener", "Resetting proof id counter"); + self.last_submitted_proof_ids.clear(); + self.persist().await; + } + } + + async fn persist(&self) { + let backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(3)), + ..ExponentialBackoff::default() + }; + + let write = retry(backoff, || async { + persistence::persist_proof_tracker( + &self.persisted_proof_id_dir, + &self + ).await.map_err(|err|{ + tracing::warn!(target: "chain-listener", "Failed to persist proof tracker state: {err}; Retrying..."); + eyre!(err) + })?; + Ok(()) + }).await; + + if let Err(err) = write { + tracing::warn!(target: "chain-listener", "Failed to persist proof tracker state: {err}; Ignoring.."); + } else { + tracing::debug!(target: "chain-listener", "Proof tracker state persisted successfully"); + } + } +} + +impl From<&ProofTracker> for PersistedProofTracker { + fn from(tracker: &ProofTracker) -> Self { + Self { + proof_ids: tracker + .last_submitted_proof_ids + .clone() + .into_iter() + .collect(), + proof_counter: tracker.proof_counter.clone().into_iter().collect(), + global_nonce: tracker.global_nonce.clone(), + epoch: tracker.current_epoch.clone(), + } + } +} diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index 1c2b1485d6..c80f539587 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -647,6 +647,11 @@ pub struct ChainListenerConfig { #[serde(default = "default_proof_poll_period")] #[serde(with = "humantime_serde")] pub proof_poll_period: Duration, + pub min_batch_count: usize, + pub max_batch_count: usize, + pub max_proof_batch_size: usize, + #[serde(with = "humantime_serde")] + pub epoch_end_window: Duration, } /// Name of the effector module From 3d889e886474534681b694b0d1931e14c89ee0d8 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Wed, 19 Jun 2024 15:34:56 +0300 Subject: [PATCH 02/24] poll global nonce --- crates/chain-listener/src/listener.rs | 40 +++++++++++++++++----- crates/chain-listener/src/proof_tracker.rs | 6 +++- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 38b96677ef..810cc65a53 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -90,7 +90,7 @@ pub struct ChainListener { // These settings are changed each epoch global_nonce: GlobalNonce, current_epoch: U256, - last_observed_block_timestamp: u64, + last_observed_block_timestamp: U256, current_commitment: Option, @@ -166,7 +166,7 @@ impl ChainListener { active_deals: BTreeMap::new(), metrics, proof_tracker: ProofTracker::new(persisted_proof_id_dir), - last_observed_block_timestamp: 0, + last_observed_block_timestamp: U256::ZERO, } } @@ -265,6 +265,12 @@ impl ChainListener { if let Err(err) = self.poll_pending_proof_txs().await { tracing::warn!(target: "chain-listener", "Failed to poll pending proof txs: {err}"); } + + // NOTE: we need to update global nonce by timer because sometimes it's stale + // at the beginning of the epoch. It's caused by inconsistency between eth-rpc and subscription to newHeads + if let Err(err) = self.update_global_nonce().await { + tracing::warn!(target: "chain-listener", "Failed to update global nonce: {err}"); + } } } } @@ -646,8 +652,15 @@ impl ChainListener { // TODO: add epoch_number to metrics self.set_current_epoch(epoch_number).await; - self.set_global_nonce(self.chain_connector.get_global_nonce().await?) + + let nonce_updated = self + .set_global_nonce(self.chain_connector.get_global_nonce().await?) .await; + + if !nonce_updated { + tracing::warn!(target: "chain-listener", "Epoch changed but global nonce hasn't changed. Don't worry, it'll catch up soon"); + } + tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce); if let Some(status) = self.get_commitment_status().await? { @@ -1049,9 +1062,19 @@ impl ChainListener { // Ok(()) // } + async fn update_global_nonce(&mut self) -> eyre::Result<()> { + let nonce = self.chain_connector.get_global_nonce().await?; + let nonce_changed = self.set_global_nonce(nonce).await; + if nonce_changed { + self.refresh_commitment().await?; + } + Ok(()) + } + fn is_epoch_ending(&self) -> bool { - let window = self.listener_config.epoch_end_window.to_secs(); - let next_epoch_start = self.init_timestamp + self.epoch_duration * (self.current_epoch + 1); + let window = Uint::from(self.listener_config.epoch_end_window.as_secs()); + let next_epoch_start = + self.init_timestamp + self.epoch_duration * (self.current_epoch + Uint::from(1)); next_epoch_start - self.last_observed_block_timestamp < window } @@ -1066,7 +1089,7 @@ impl ChainListener { let proof_batches = if self.is_epoch_ending() { let last_known_proofs = batch_requests .into_iter() - .map(|cu_id, req| (cu_id, req.last_seen_proof_idx)) + .map(|(cu_id, req)| (cu_id, req.last_seen_proof_idx)) .collect(); measured_request( @@ -1326,9 +1349,10 @@ impl ChainListener { self.proof_tracker.set_current_epoch(epoch_number).await; } - async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) { + /// Returns true if global nonce was updated + async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> bool { self.global_nonce = self.global_nonce; - self.proof_tracker.set_global_nonce(global_nonce).await; + self.proof_tracker.set_global_nonce(global_nonce).await } fn observe(&self, f: F) diff --git a/crates/chain-listener/src/proof_tracker.rs b/crates/chain-listener/src/proof_tracker.rs index 8671e846fa..28d5571c64 100644 --- a/crates/chain-listener/src/proof_tracker.rs +++ b/crates/chain-listener/src/proof_tracker.rs @@ -116,13 +116,17 @@ impl ProofTracker { } } - pub async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) { + /// Returns true if the global nonce has changed + pub async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> bool { if self.global_nonce != global_nonce { tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce); self.global_nonce = global_nonce; tracing::info!(target: "chain-listener", "Resetting proof id counter"); self.last_submitted_proof_ids.clear(); self.persist().await; + true + } else { + false } } From b2076552cb4eea734ea1a5826479659acb53f3ea Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Thu, 20 Jun 2024 21:31:05 +0300 Subject: [PATCH 03/24] add logs --- crates/chain-data/src/block_header.rs | 4 ++-- crates/chain-listener/src/listener.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/chain-data/src/block_header.rs b/crates/chain-data/src/block_header.rs index 92715e4bcb..7fe332df72 100644 --- a/crates/chain-data/src/block_header.rs +++ b/crates/chain-data/src/block_header.rs @@ -43,12 +43,12 @@ impl BlockHeader { .to_string(); Ok(Self { - number: BlockNumber::from_str_radix(&block_number.trim_start_matches("0x"), 16)?, + number: BlockNumber::from_str_radix(block_number.trim_start_matches("0x"), 16)?, timestamp: U256::from_str(×tamp)?, }) } - pub fn from_str(json: &str) -> eyre::Result { + pub fn from_json_str(json: &str) -> eyre::Result { let json: Value = serde_json::from_str(json) .map_err(|err| eyre::eyre!("failed to parse header {err}; got {json}"))?; Self::from_json(json) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 810cc65a53..05675f74ab 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1092,6 +1092,7 @@ impl ChainListener { .map(|(cu_id, req)| (cu_id, req.last_seen_proof_idx)) .collect(); + tracing::debug!(target: "chain-listener", "Polling proofs after {:?}", last_known_proofs); measured_request( &self.metrics, async { ccp_client.get_proofs_after(last_known_proofs, PROOF_POLL_LIMIT) } @@ -1099,6 +1100,7 @@ impl ChainListener { ) .await? } else { + tracing::debug!(target: "chain-listener", "Polling proofs after {:?}, min batch count: {}, max batch count: {}", batch_requests, self.listener_config.min_batch_count, self.listener_config.max_batch_count); measured_request( &self.metrics, async { From 069db225175dbb337409113973f2a47d288f3615 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Thu, 20 Jun 2024 22:18:42 +0300 Subject: [PATCH 04/24] update ccp --- Cargo.lock | 33 +++++++++++++++++---------------- Cargo.toml | 2 +- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dacb536d91..e902af8f1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1259,8 +1259,8 @@ dependencies = [ [[package]] name = "ccp-rpc-client" -version = "0.10.2" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" +version = "0.11.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "hex", @@ -1270,13 +1270,14 @@ dependencies = [ [[package]] name = "ccp-shared" -version = "0.10.2" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" +version = "0.11.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "hex", "newtype_derive", "rand 0.8.5", "serde", + "serde_with 3.8.1", ] [[package]] @@ -1398,7 +1399,7 @@ dependencies = [ "peer-metrics", "serde", "serde_json", - "serde_with 3.7.0", + "serde_with 3.8.1", "server-config", "thiserror", "tokio", @@ -1683,7 +1684,7 @@ dependencies = [ "rand 0.8.5", "range-set-blaze", "serde", - "serde_with 3.7.0", + "serde_with 3.8.1", "tempfile", "thiserror", "tokio", @@ -1728,8 +1729,8 @@ dependencies = [ [[package]] name = "cpu-utils" -version = "0.10.2" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#8ac4d92d3f18f6e1de56eb4e6f2030bbcb339306" +version = "0.11.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "ccp_core_affinity", @@ -3150,7 +3151,7 @@ name = "hex-utils" version = "0.1.0" dependencies = [ "hex", - "serde_with 3.7.0", + "serde_with 3.8.1", ] [[package]] @@ -7711,11 +7712,11 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.7.0" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "chrono", "hex", "indexmap 1.9.3", @@ -7723,7 +7724,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with_macros 3.7.0", + "serde_with_macros 3.8.1", "time", ] @@ -7741,9 +7742,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.7.0" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" +checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" dependencies = [ "darling", "proc-macro2", @@ -7784,7 +7785,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "serde_with 3.7.0", + "serde_with 3.8.1", "temp-env", "tempfile", "toml 0.8.13", diff --git a/Cargo.toml b/Cargo.toml index a61c697d1e..e3101d0dcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,7 +170,7 @@ rand = "0.8.5" futures-util = "0.3.30" num_cpus = "1.16.0" enum_dispatch = "0.3.12" -serde_with = "3.7.0" +serde_with = "3.8.1" mockito = "1.2.0" clarity = "1.3.0" cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } From 26e0dd128dec2e7acb8b7a761fc6f45c94b70779 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Thu, 20 Jun 2024 23:17:47 +0300 Subject: [PATCH 05/24] add logs --- Cargo.lock | 1 + crates/chain-listener/Cargo.toml | 2 + crates/chain-listener/src/persistence.rs | 60 ++++++++++++++++++++---- 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e902af8f1b..7a378c6576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1401,6 +1401,7 @@ dependencies = [ "serde_json", "serde_with 3.8.1", "server-config", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml index c931bd554c..539a64a688 100644 --- a/crates/chain-listener/Cargo.toml +++ b/crates/chain-listener/Cargo.toml @@ -44,3 +44,5 @@ serde_with = { workspace = true } [dev-dependencies] jsonrpsee = { workspace = true, features = ["server"] } +tempfile = { workspace = true } + diff --git a/crates/chain-listener/src/persistence.rs b/crates/chain-listener/src/persistence.rs index f1b88407b6..980f4aab80 100644 --- a/crates/chain-listener/src/persistence.rs +++ b/crates/chain-listener/src/persistence.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize}; #[serde_as] #[derive(Serialize, Deserialize)] pub struct PersistedProofTracker { - #[serde_as(as = "Vec<(Hex,_)>")] + #[serde_as(as = "Vec<(Hex,DisplayFromStr)>")] pub proof_ids: Vec<(CUID, ProofIdx)>, #[serde_as(as = "Vec<(Hex,DisplayFromStr)>")] pub proof_counter: Vec<(CUID, U256)>, @@ -42,7 +42,7 @@ pub struct PersistedProofTracker { pub epoch: U256, } -pub(crate) fn proof_id_filename() -> String { +pub(crate) fn proof_tracker_state_filename() -> String { "proof_tracker.toml".to_string() } @@ -50,29 +50,73 @@ pub(crate) async fn persist_proof_tracker( proof_id_dir: &Path, proof_tracker: &ProofTracker, ) -> eyre::Result<()> { - let path = proof_id_dir.join(proof_id_filename()); + let path = proof_id_dir.join(proof_tracker_state_filename()); let bytes = toml_edit::ser::to_vec(&PersistedProofTracker::from(proof_tracker)) - .map_err(|err| eyre::eyre!("Proof id serialization failed {err}"))?; + .map_err(|err| eyre::eyre!("Proof tracker serialization failed {err}"))?; tokio::fs::write(&path, bytes) .await .context(format!("error writing proof id to {}", path.display())) } pub(crate) async fn load_persisted_proof_tracker( - proof_id_dir: &Path, + proof_tracker_dir: &Path, ) -> eyre::Result> { - let path = proof_id_dir.join(proof_id_filename()); + let path = proof_tracker_dir.join(proof_tracker_state_filename()); if path.exists() { let bytes = tokio::fs::read(&path).await.context(format!( "error reading proof tracker state from {}", path.display() ))?; let persisted_proof = toml_edit::de::from_slice(&bytes).context(format!( - "error deserializing proof tracker state from {}", - path.display() + "error deserializing proof tracker state from {}, content {}", + path.display(), + String::from_utf8_lossy(&bytes) ))?; Ok(Some(persisted_proof)) } else { Ok(None) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use tempfile::tempdir; + + #[tokio::test] + async fn proof_tracker_test() { + let dir = tempdir().expect("Could not create temp dir"); + let mut proof_tracker = ProofTracker::new(dir.path().to_path_buf()); + let cuid = + CUID::from_str("7dcee6bb1c39396de3b19424154ad3996cbdef5f3950022325bb4f651e48fbe0") + .unwrap(); + + let global_nonce = GlobalNonce::new([1; 32]); + let epoch = U256::from(100); + let proof_id = ProofIdx::from_str("1").unwrap(); + proof_tracker.set_current_epoch(epoch).await; + proof_tracker.set_global_nonce(global_nonce).await; + + proof_tracker.observe_proof(cuid, proof_id).await; + + proof_tracker.confirm_proof(cuid).await; + proof_tracker.confirm_proof(cuid).await; + + let persisted_proof_tracker = load_persisted_proof_tracker(dir.path()) + .await + .unwrap() + .unwrap(); + + assert_eq!(persisted_proof_tracker.proof_ids.len(), 1); + assert_eq!(persisted_proof_tracker.proof_ids[0].0, cuid); + assert_eq!(persisted_proof_tracker.proof_ids[0].1, proof_id); + + assert_eq!(persisted_proof_tracker.proof_counter.len(), 1); + assert_eq!(persisted_proof_tracker.proof_counter[0].0, cuid); + assert_eq!(persisted_proof_tracker.proof_counter[0].1, U256::from(2)); + + assert_eq!(persisted_proof_tracker.epoch, epoch); + assert_eq!(persisted_proof_tracker.global_nonce, global_nonce); + } +} From 6f577a8d311e60d34ae643c01981d8559a6c3011 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Thu, 20 Jun 2024 23:21:07 +0300 Subject: [PATCH 06/24] add logs --- crates/chain-connector/src/connector.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index 4f236731ad..d3425815f5 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -607,10 +607,13 @@ impl ChainConnector for HttpChainConnector { batch.insert("eth_getBlockByNumber", rpc_params!["latest", false])?; let resp: BatchResponse = self.client.batch_request(batch).await?; + tracing::debug!(target: "chain-connector", "Got cc init params response: {resp:?}"); + let mut results = resp .into_ok() .map_err(|err| eyre!("Some request failed in a batch {err:?}"))?; + // TODO: check with 0x and write test let difficulty: FixedBytes<32> = FixedBytes::from_str(&parse_str_field(results.next(), "difficulty")?)?; From 8fb0aa8b5f5c36b1d23f69ae5776664395ee83ca Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Thu, 20 Jun 2024 23:45:52 +0300 Subject: [PATCH 07/24] serde nonce as hex --- crates/chain-listener/src/persistence.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/chain-listener/src/persistence.rs b/crates/chain-listener/src/persistence.rs index 980f4aab80..409e0ae6fe 100644 --- a/crates/chain-listener/src/persistence.rs +++ b/crates/chain-listener/src/persistence.rs @@ -34,6 +34,7 @@ pub struct PersistedProofTracker { pub proof_ids: Vec<(CUID, ProofIdx)>, #[serde_as(as = "Vec<(Hex,DisplayFromStr)>")] pub proof_counter: Vec<(CUID, U256)>, + #[serde_as(as = "Hex")] pub global_nonce: GlobalNonce, #[serde( serialize_with = "U256_as_String", From b5d6b802decd0eb2e97b2ad186535049fe49dc58 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 00:28:59 +0300 Subject: [PATCH 08/24] send proof only if not empty --- crates/chain-listener/src/listener.rs | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 05675f74ab..01d27908a4 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1133,24 +1133,24 @@ impl ChainListener { .map(|p| p.proof_batches.len()) .sum::(); tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling", total_proofs, proof_batches.len()); - } - let mut unit_ids = Vec::new(); - let mut local_nonces = Vec::new(); - let mut result_hashes = Vec::new(); - for batch in proof_batches.into_iter() { - for proof in batch.proof_batches.into_iter() { - unit_ids.push(proof.cu_id); - local_nonces.push(proof.local_nonce); - result_hashes.push(proof.result_hash); - self.proof_tracker - .observe_proof(proof.cu_id, proof.id.idx) - .await; + let mut unit_ids = Vec::new(); + let mut local_nonces = Vec::new(); + let mut result_hashes = Vec::new(); + for batch in proof_batches.into_iter() { + for proof in batch.proof_batches.into_iter() { + unit_ids.push(proof.cu_id); + local_nonces.push(proof.local_nonce); + result_hashes.push(proof.result_hash); + self.proof_tracker + .observe_proof(proof.cu_id, proof.id.idx) + .await; + } } - } - self.submit_proofs(unit_ids, local_nonces, result_hashes) - .await?; + self.submit_proofs(unit_ids, local_nonces, result_hashes) + .await?; + } } Ok(()) } From fc2be191de0271e77f6d7c3e7559c085294f1cea Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 12:10:06 +0300 Subject: [PATCH 09/24] add logs --- crates/chain-connector/src/connector.rs | 8 ++++---- crates/chain-listener/src/listener.rs | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index d3425815f5..207ae07503 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -309,7 +309,7 @@ impl HttpChainConnector { pub(crate) async fn get_deals(&self) -> eyre::Result> { let units = self.get_compute_units().await?; - tracing::debug!(target: "chain-connector", "Got {} compute units", units.len()); + tracing::debug!(target: "chain-connector", "get_deals: Got {} compute units", units.len()); let mut deals: BTreeMap>> = BTreeMap::new(); units @@ -325,9 +325,9 @@ impl HttpChainConnector { if deals.is_empty() { return Ok(Vec::new()); } - tracing::debug!(target: "chain-connector", "Got {} deals: {:?}", deals.len(), deals); + tracing::debug!(target: "chain-connector", "get_deals: Got {} deals: {:?}", deals.len(), deals); let infos = self.get_deal_infos(deals.keys()).await?; - tracing::debug!(target: "chain-connector", "Got {} deals infos: {:?}", infos.len(), infos); + tracing::debug!(target: "chain-connector", "get_deals: Got {} deals infos: {:?}", infos.len(), infos); let deals = infos .into_iter() .zip(deals) @@ -343,7 +343,7 @@ impl HttpChainConnector { Err(err) => DealResult::with_error(deal_id, err.to_string()), }) .collect::<_>(); - tracing::debug!(target: "chain-connector", "Return deals: {:?}", deals); + tracing::debug!(target: "chain-connector", "get_deals: Return deals: {:?}", deals); Ok(deals) } diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 01d27908a4..8646d4b051 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1150,6 +1150,8 @@ impl ChainListener { self.submit_proofs(unit_ids, local_nonces, result_hashes) .await?; + } else { + tracing::debug!(target: "chain-listener", "No proofs found from polling"); } } Ok(()) From 721664e391232b7903f4f483350cfbbd8cf2521a Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 19:10:24 +0300 Subject: [PATCH 10/24] try to log multiline error properly --- crates/chain-listener/src/listener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 8646d4b051..1299223d03 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1198,7 +1198,7 @@ impl ChainListener { Ok(()) } else { // TODO: catch more contract asserts like "Proof is not valid" and "Proof is bigger than difficulty" - tracing::error!(target: "chain-listener", "Failed to submit proofs {err}"); + tracing::error!(target: "chain-listener", "Failed to submit proofs {}", err.to_string().replace("\n", " ")); tracing::error!(target: "chain-listener", "Units {:?} nonces {:?} result hashes {:?}", unit_ids, local_nonces, result_hashes); // In case of contract errors we just skip these proofs and continue Ok(()) From 2a27d76bbdb3224bdfbe1aa2f95bffc2a67260ae Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 19:49:02 +0300 Subject: [PATCH 11/24] update ccp deps --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e3101d0dcb..f280b1dbc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.8.1" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +cpu-utils = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-shared = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-rpc-client = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } alloy-sol-types = "0.6.4" alloy-primitives = "0.6.4" alloy_serde_macro = "0.1.2" From e92a4cf65244aa07011bf5167cd6872a709911c0 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 20:09:29 +0300 Subject: [PATCH 12/24] fix logs --- Cargo.lock | 6 +++--- crates/chain-connector/src/connector.rs | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a378c6576..db97192dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "ccp-rpc-client" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "hex", @@ -1271,7 +1271,7 @@ dependencies = [ [[package]] name = "ccp-shared" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "hex", "newtype_derive", @@ -1731,7 +1731,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "ccp_core_affinity", diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index 207ae07503..7ab6f2d17b 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -482,16 +482,19 @@ impl HttpChainConnector { let tx = tx .sign(&self.config.wallet_key, Some(self.config.network_id)) .to_bytes(); - let tx = hex::encode(tx); + let signed_tx = hex::encode(tx); tracing::info!(target: "chain-connector", - "Sending tx to {to} from {} signed {tx}", + "Sending tx to {to} from {} data {tx}", self.config.wallet_key.to_address() ); let resp: String = process_response( self.client - .request("eth_sendRawTransaction", rpc_params![format!("0x{}", tx)]) + .request( + "eth_sendRawTransaction", + rpc_params![format!("0x{}", signed_tx)], + ) .await, )?; Ok(resp) From 95ecb26e4a647eee402adcf572c37bba2770d497 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Fri, 21 Jun 2024 20:23:48 +0300 Subject: [PATCH 13/24] change deps --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- crates/chain-connector/src/connector.rs | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db97192dfa..7a378c6576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "ccp-rpc-client" version = "0.11.0" -source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "hex", @@ -1271,7 +1271,7 @@ dependencies = [ [[package]] name = "ccp-shared" version = "0.11.0" -source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "hex", "newtype_derive", @@ -1731,7 +1731,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.11.0" -source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "ccp_core_affinity", diff --git a/Cargo.toml b/Cargo.toml index f280b1dbc0..e3101d0dcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.8.1" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-shared = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-rpc-client = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } alloy-sol-types = "0.6.4" alloy-primitives = "0.6.4" alloy_serde_macro = "0.1.2" diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index 7ab6f2d17b..15dcae5341 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -473,20 +473,20 @@ impl HttpChainConnector { gas_limit: Uint256::from_le_bytes(&gas_limit.to_le_bytes_vec()), to: to.parse()?, value: 0u32.into(), - data, + data: data.clone(), signature: None, // Not signed. Yet. max_fee_per_gas: Uint256::from_le_bytes(&max_fee_per_gas.to_le_bytes_vec()), access_list: vec![], }; - let tx = tx + let signed_tx = tx .sign(&self.config.wallet_key, Some(self.config.network_id)) .to_bytes(); - let signed_tx = hex::encode(tx); + let signed_tx = hex::encode(signed_tx); tracing::info!(target: "chain-connector", - "Sending tx to {to} from {} data {tx}", - self.config.wallet_key.to_address() + "Sending tx to {to} from {} data {}", + self.config.wallet_key.to_address(), hex::encode(&data) ); let resp: String = process_response( From c95eda417da043999dc0633a175ca6c70a430a1b Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Mon, 24 Jun 2024 19:50:32 +0300 Subject: [PATCH 14/24] fix bug --- crates/chain-listener/src/listener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 1299223d03..6c7cb80009 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1355,7 +1355,7 @@ impl ChainListener { /// Returns true if global nonce was updated async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> bool { - self.global_nonce = self.global_nonce; + self.global_nonce = global_nonce; self.proof_tracker.set_global_nonce(global_nonce).await } From 90730ab6c1916a0fe0cb65e99bc08085c0b684ba Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Mon, 24 Jun 2024 20:02:20 +0300 Subject: [PATCH 15/24] update deps --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a378c6576..db97192dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "ccp-rpc-client" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "hex", @@ -1271,7 +1271,7 @@ dependencies = [ [[package]] name = "ccp-shared" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "hex", "newtype_derive", @@ -1731,7 +1731,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.11.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" +source = "git+ssh://git@github.com/fluencelabs/capacity-commitment-prover.git?branch=feat/VM-613-batching-api#eb4648bb580e90687ff8c822442ba1de17cd35b3" dependencies = [ "ccp-shared", "ccp_core_affinity", diff --git a/Cargo.toml b/Cargo.toml index e3101d0dcb..f280b1dbc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,9 +173,9 @@ enum_dispatch = "0.3.12" serde_with = "3.8.1" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } -ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +cpu-utils = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-shared = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } +ccp-rpc-client = { git = "ssh://git@github.com/fluencelabs/capacity-commitment-prover.git", branch = "feat/VM-613-batching-api" } alloy-sol-types = "0.6.4" alloy-primitives = "0.6.4" alloy_serde_macro = "0.1.2" From 73d957f6a6389cba803cc2c3244f82771d22b8bf Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 19:21:14 +0200 Subject: [PATCH 16/24] chore: set up login to git --- .cargo/config.toml | 1 + .github/workflows/build.yml | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/.cargo/config.toml b/.cargo/config.toml index de68640c7d..e4bd3b2c2a 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,6 @@ [net] retry = 50 +git-fetch-with-cli = true [registries] fluence = { index = "git://crates.fluence.dev/index" } diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2a6881df3c..201de69c3b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,6 +40,11 @@ jobs: repository: fluencelabs/nox ref: ${{ inputs.ref }} + - name: Set Githib Token to pull from Fluence private repo + run: | + git config --global url."https://${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com + + - name: Get PR labels id: labels uses: joerick/pr-labels-action@v1.0.9 From 9c405a105c69564cb05398b7efc2b08b5578c502 Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 19:29:51 +0200 Subject: [PATCH 17/24] chore: set up login to git --- .github/workflows/build.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 201de69c3b..413f2ea498 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,7 +42,9 @@ jobs: - name: Set Githib Token to pull from Fluence private repo run: | - git config --global url."https://${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com + git config --global credential.helper store + echo "https://git:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com" > ~/.git-credentials + git config --global url."https://git:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com - name: Get PR labels From 5aa2850e773d9244807474799b670acfb52fc108 Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 19:42:14 +0200 Subject: [PATCH 18/24] chore: set up login to git --- .github/workflows/build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 413f2ea498..3680684c34 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -43,8 +43,8 @@ jobs: - name: Set Githib Token to pull from Fluence private repo run: | git config --global credential.helper store - echo "https://git:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com" > ~/.git-credentials - git config --global url."https://git:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com + echo "https://git:${{ secrets.FLUENCEBOT_PRIV_KEY }}@github.com" > ~/.git-credentials + git config --global url."https://${{ secrets.FLUENCEBOT_PRIV_KEY }}@github.com".insteadOf ssh://git@github.com - name: Get PR labels From 5f76359bfd028a7e2fc6e6cf15e92b017dc865b1 Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 19:52:42 +0200 Subject: [PATCH 19/24] chore: set up login to git --- .cargo/config.toml | 1 - .github/workflows/build.yml | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index e4bd3b2c2a..de68640c7d 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,5 @@ [net] retry = 50 -git-fetch-with-cli = true [registries] fluence = { index = "git://crates.fluence.dev/index" } diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3680684c34..c787d058e4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,13 +40,12 @@ jobs: repository: fluencelabs/nox ref: ${{ inputs.ref }} - - name: Set Githib Token to pull from Fluence private repo + - name: Setup Git + # Fix our git override using the generated token from the create_token step + use the "x-access-token". run: | - git config --global credential.helper store - echo "https://git:${{ secrets.FLUENCEBOT_PRIV_KEY }}@github.com" > ~/.git-credentials - git config --global url."https://${{ secrets.FLUENCEBOT_PRIV_KEY }}@github.com".insteadOf ssh://git@github.com - + git config --global url."https://x-access-token:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com + - name: Get PR labels id: labels uses: joerick/pr-labels-action@v1.0.9 From 3d4026cb2386f4cf9eb2902b17b5ed11e06022fd Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 19:55:43 +0200 Subject: [PATCH 20/24] chore: set up login to git --- .cargo/config.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/.cargo/config.toml b/.cargo/config.toml index de68640c7d..e4bd3b2c2a 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,6 @@ [net] retry = 50 +git-fetch-with-cli = true [registries] fluence = { index = "git://crates.fluence.dev/index" } From 78c7406c2914449a3833f5173c9743ff0100f402 Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 20:12:53 +0200 Subject: [PATCH 21/24] chore: set up login to git --- .github/workflows/build.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c787d058e4..75c6fec20c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,10 +40,9 @@ jobs: repository: fluencelabs/nox ref: ${{ inputs.ref }} - - name: Setup Git - # Fix our git override using the generated token from the create_token step + use the "x-access-token". - run: | - git config --global url."https://x-access-token:${{ secrets.FLUENCEBOT_RELEASE_PLEASE_PAT }}@github.com".insteadOf ssh://git@github.com + - uses: webfactory/ssh-agent@v0.9.0 + with: + ssh-private-key: ${{ secrets.FLUENCEBOT_PRIVATE_KEY }} - name: Get PR labels From 81b8a54df39f62262e5840c9185c65fed0093036 Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 20:20:19 +0200 Subject: [PATCH 22/24] chore: set up login to git --- .github/workflows/build.yml | 7 +++++-- .github/workflows/e2e.yml | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 75c6fec20c..a910d840cd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -11,6 +11,10 @@ on: description: "Cargo dependencies map" type: string default: "null" + secrets: + github_priv_key: + description: "github private key" + required: true jobs: build: @@ -42,9 +46,8 @@ jobs: - uses: webfactory/ssh-agent@v0.9.0 with: - ssh-private-key: ${{ secrets.FLUENCEBOT_PRIVATE_KEY }} + ssh-private-key: ${{ secrets.github_priv_key }} - - name: Get PR labels id: labels uses: joerick/pr-labels-action@v1.0.9 diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index c76c4d01cc..b12de686c7 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -40,7 +40,9 @@ jobs: uses: ./.github/workflows/build.yml with: ref: ${{ github.ref }} - + secrets: + github_priv_key: ${{ secrets.FLUENCEBOT_PRIVATE_KEY }} + nox-snapshot: name: "nox" needs: nox From e05476e5002b7ddb123c7984ab6a199c333ec49b Mon Sep 17 00:00:00 2001 From: enjenjenje Date: Mon, 24 Jun 2024 20:26:04 +0200 Subject: [PATCH 23/24] chore: set up login to git --- .cargo/config.toml | 1 - .github/workflows/build.yml | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index e4bd3b2c2a..de68640c7d 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,5 @@ [net] retry = 50 -git-fetch-with-cli = true [registries] fluence = { index = "git://crates.fluence.dev/index" } diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a910d840cd..4b561cc48a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -44,9 +44,9 @@ jobs: repository: fluencelabs/nox ref: ${{ inputs.ref }} - - uses: webfactory/ssh-agent@v0.9.0 - with: - ssh-private-key: ${{ secrets.github_priv_key }} + - name: Setup Git + run: | + git config --global url."https://x-access-token:${{ secrets.github_priv_key }}@github.com".insteadOf ssh://git@github.com - name: Get PR labels id: labels From fb8febadcc6f7bf6949d1875d89043b111dfd088 Mon Sep 17 00:00:00 2001 From: Alexey Proshutinskiy Date: Sun, 7 Jul 2024 14:31:43 +0300 Subject: [PATCH 24/24] fix --- crates/chain-listener/src/listener.rs | 31 +++++++++++----------- crates/chain-listener/src/proof_tracker.rs | 2 +- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 050694d2b3..b41e921476 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1,20 +1,17 @@ /* - * Nox Fluence Peer + * Copyright 2024 Fluence DAO * - * Copyright (C) 2024 Fluence DAO + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation version 3 of the - * License. + * http://www.apache.org/licenses/LICENSE-2.0 * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ use alloy_primitives::{Address, FixedBytes, Uint, U256, U64}; @@ -1381,8 +1378,12 @@ impl ChainListener { let mut batch_request = HashMap::new(); for cu_id in self.cc_compute_units.keys() { let sent_proofs_count = self.proof_tracker.get_proof_counter(cu_id); - let proofs_needed = - U64::from(self.max_proofs_per_epoch - sent_proofs_count).as_limbs()[0] as usize; + let proofs_needed = U64::from( + self.max_proofs_per_epoch + .checked_add(-sent_proofs_count) + .unwrap_or(Uint::ZERO), + ) + .as_limbs()[0] as usize; if proofs_needed > 0 { let request = BatchRequest { diff --git a/crates/chain-listener/src/proof_tracker.rs b/crates/chain-listener/src/proof_tracker.rs index 28d5571c64..d82daefc19 100644 --- a/crates/chain-listener/src/proof_tracker.rs +++ b/crates/chain-listener/src/proof_tracker.rs @@ -119,7 +119,7 @@ impl ProofTracker { /// Returns true if the global nonce has changed pub async fn set_global_nonce(&mut self, global_nonce: GlobalNonce) -> bool { if self.global_nonce != global_nonce { - tracing::info!(target: "chain-listener", "Global changed, was {}, new global nonce is {global_nonce}", self.global_nonce); + tracing::info!(target: "chain-listener", "Global nonce changed, was {}, new global nonce is {global_nonce}", self.global_nonce); self.global_nonce = global_nonce; tracing::info!(target: "chain-listener", "Resetting proof id counter"); self.last_submitted_proof_ids.clear();