From 611418bc41102b80666bb5e374935b0352f782ff Mon Sep 17 00:00:00 2001 From: Raj Raorane <41839716+Raj-RR1@users.noreply.github.com> Date: Tue, 18 Jul 2023 20:28:00 +0530 Subject: [PATCH] partial-updates consensus-transition --- .../manual-seal/src/error.rs | 12 +- .../manual-seal/src/finalize_block.rs | 4 +- .../manual-seal/src/lib.rs | 121 +++++++++++++++++- .../manual-seal/src/rpc.rs | 6 +- .../manual-seal/src/seal_block.rs | 18 +-- 5 files changed, 142 insertions(+), 19 deletions(-) diff --git a/node/consensus-transition/manual-seal/src/error.rs b/node/consensus-transition/manual-seal/src/error.rs index 7c321120..e7614082 100644 --- a/node/consensus-transition/manual-seal/src/error.rs +++ b/node/consensus-transition/manual-seal/src/error.rs @@ -20,6 +20,10 @@ //! This is suitable for a testing environment. use futures::channel::{mpsc::SendError, oneshot}; +use jsonrpsee::{ + core::Error as JsonRpseeError, + types::error::{CallError, ErrorObject}, +}; use sc_consensus::ImportResult; use sp_blockchain::Error as BlockchainError; use sp_consensus::Error as ConsensusError; @@ -71,7 +75,7 @@ pub enum Error { SendError(#[from] SendError), /// Some other error. #[error("Other error: {0}")] - Other(#[from] Box), + Other(#[from] Box), } impl From for Error { @@ -111,3 +115,9 @@ impl From for jsonrpc_core::Error { } } } + +impl From for JsonRpseeError { + fn from(err: Error) -> Self { + CallError::Custom(ErrorObject::owned(err.to_code(), err.to_string(), None::<()>)).into() + } +} \ No newline at end of file diff --git a/node/consensus-transition/manual-seal/src/finalize_block.rs b/node/consensus-transition/manual-seal/src/finalize_block.rs index d134ce77..cee4d59b 100644 --- a/node/consensus-transition/manual-seal/src/finalize_block.rs +++ b/node/consensus-transition/manual-seal/src/finalize_block.rs @@ -20,7 +20,7 @@ use crate::rpc; use sc_client_api::backend::{Backend as ClientBackend, Finalizer}; -use sp_runtime::{generic::BlockId, traits::Block as BlockT, Justification}; +use sp_runtime::{traits::Block as BlockT, Justification}; use std::{marker::PhantomData, sync::Arc}; /// params for block finalization. @@ -46,7 +46,7 @@ where { let FinalizeBlockParams { hash, mut sender, justification, finalizer, .. } = params; - match finalizer.finalize_block(BlockId::Hash(hash), justification, true) { + match finalizer.finalize_block(hash, justification, true) { Err(e) => { log::warn!("Failed to finalize block {}", e); rpc::send_result(&mut sender, Err(e.into())) diff --git a/node/consensus-transition/manual-seal/src/lib.rs b/node/consensus-transition/manual-seal/src/lib.rs index a8d2634a..fca14467 100644 --- a/node/consensus-transition/manual-seal/src/lib.rs +++ b/node/consensus-transition/manual-seal/src/lib.rs @@ -81,7 +81,7 @@ where } /// Params required to start the instant sealing authorship task. -pub struct ManualSealParams, TP, SC, CS, CIDP> { +pub struct ManualSealParams, TP, SC, CS, CIDP, P> { /// Block import instance for well. importing blocks. pub block_import: BI, @@ -103,14 +103,14 @@ pub struct ManualSealParams, TP, SC, C /// Digest provider for inclusion in blocks. pub consensus_data_provider: - Option>>>, + Option>>>, /// Something that can create the inherent data providers. pub create_inherent_data_providers: CIDP, } /// Params required to start the manual sealing authorship task. -pub struct InstantSealParams, TP, SC, CIDP> { +pub struct InstantSealParams, TP, SC, CIDP, P> { /// Block import instance for well. importing blocks. pub block_import: BI, @@ -128,14 +128,26 @@ pub struct InstantSealParams, TP, SC, /// Digest provider for inclusion in blocks. pub consensus_data_provider: - Option>>>, + Option>>>, /// Something that can create the inherent data providers. pub create_inherent_data_providers: CIDP, } +/// Params required to start the delayed finalization task. +pub struct DelayedFinalizeParams { + /// Block import instance. + pub client: Arc, + + /// Handle for spawning delayed finalization tasks. + pub spawn_handle: S, + + /// The delay in seconds before a block is finalized. + pub delay_sec: u64, +} + /// Creates the background authorship task for the manual seal engine. -pub async fn run_manual_seal( +pub async fn run_manual_seal( ManualSealParams { mut block_import, mut env, @@ -161,6 +173,7 @@ pub async fn run_manual_seal( TransactionFor: 'static, TP: TransactionPool, CIDP: CreateInherentDataProviders, + P: Send + Sync + 'static, { while let Some(command) = commands_stream.next().await { match command { @@ -198,7 +211,7 @@ pub async fn run_manual_seal( /// runs the background authorship task for the instant seal engine. /// instant-seal creates a new block for every transaction imported into /// the transaction pool. -pub async fn run_instant_seal( +pub async fn run_instant_seal( InstantSealParams { block_import, env, @@ -207,7 +220,7 @@ pub async fn run_instant_seal( select_chain, consensus_data_provider, create_inherent_data_providers, - }: InstantSealParams, + }: InstantSealParams, ) where B: BlockT + 'static, BI: BlockImport> @@ -222,6 +235,7 @@ pub async fn run_instant_seal( TransactionFor: 'static, TP: TransactionPool, CIDP: CreateInherentDataProviders, + P: Send + Sync + 'static, { // instant-seal creates blocks as soon as transactions are imported // into the transaction pool. @@ -245,6 +259,99 @@ pub async fn run_instant_seal( .await } +/// Runs the background authorship task for the instant seal engine. +/// instant-seal creates a new block for every transaction imported into +/// the transaction pool. +/// +/// This function will finalize the block immediately as well. If you don't +/// want this behavior use `run_instant_seal` instead. +pub async fn run_instant_seal_and_finalize( + InstantSealParams { + block_import, + env, + client, + pool, + select_chain, + consensus_data_provider, + create_inherent_data_providers, + }: InstantSealParams, +) where + B: BlockT + 'static, + BI: BlockImport> + + Send + + Sync + + 'static, + C: HeaderBackend + Finalizer + ProvideRuntimeApi + 'static, + CB: ClientBackend + 'static, + E: Environment + 'static, + E::Proposer: Proposer>, + SC: SelectChain + 'static, + TransactionFor: 'static, + TP: TransactionPool, + CIDP: CreateInherentDataProviders, + P: Send + Sync + 'static, +{ + // Creates and finalizes blocks as soon as transactions are imported + // into the transaction pool. + let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock { + create_empty: false, + finalize: true, + parent_hash: None, + sender: None, + }); + + run_manual_seal(ManualSealParams { + block_import, + env, + client, + pool, + commands_stream, + select_chain, + consensus_data_provider, + create_inherent_data_providers, + }) + .await +} + +/// Creates a future for delayed finalization of manual sealed blocks. +/// +/// The future needs to be spawned in the background alongside the +/// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that +/// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly +/// after building them. This also means that delayed finality can not be used with +/// [`run_instant_seal_and_finalize`]. +pub async fn run_delayed_finalize( + DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams, +) where + B: BlockT + 'static, + CB: ClientBackend + 'static, + C: HeaderBackend + Finalizer + ProvideRuntimeApi + BlockchainEvents + 'static, + S: SpawnNamed, +{ + let mut block_import_stream = client.import_notification_stream(); + + while let Some(notification) = block_import_stream.next().await { + let delay = Delay::new(Duration::from_secs(delay_sec)); + let cloned_client = client.clone(); + spawn_handle.spawn( + "delayed-finalize", + None, + Box::pin(async move { + delay.await; + finalize_block(FinalizeBlockParams { + hash: notification.hash, + sender: None, + justification: None, + finalizer: cloned_client, + _phantom: PhantomData, + }) + .await + }), + ); + } +} + + #[cfg(test)] mod tests { use super::*; diff --git a/node/consensus-transition/manual-seal/src/rpc.rs b/node/consensus-transition/manual-seal/src/rpc.rs index 4a8dcbc0..d91ca8d9 100644 --- a/node/consensus-transition/manual-seal/src/rpc.rs +++ b/node/consensus-transition/manual-seal/src/rpc.rs @@ -23,6 +23,10 @@ use futures::{ channel::{mpsc, oneshot}, FutureExt, SinkExt, TryFutureExt, }; +use jsonrpsee::{ + core::{async_trait, Error as JsonRpseeError, RpcResult}, + proc_macros::rpc, +}; use jsonrpc_core::Error; use jsonrpc_derive::rpc; use sc_consensus::ImportedAux; @@ -30,7 +34,7 @@ use serde::{Deserialize, Serialize}; use sp_runtime::EncodedJustification; /// Future's type for jsonrpc -type FutureResult = jsonrpc_core::BoxFuture>; +//type FutureResult = jsonrpc_core::BoxFuture>; /// sender passed to the authorship task to report errors or successes. pub type Sender = Option>>; diff --git a/node/consensus-transition/manual-seal/src/seal_block.rs b/node/consensus-transition/manual-seal/src/seal_block.rs index 202b54fe..9a1ae267 100644 --- a/node/consensus-transition/manual-seal/src/seal_block.rs +++ b/node/consensus-transition/manual-seal/src/seal_block.rs @@ -36,7 +36,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; pub const MAX_PROPOSAL_DURATION: u64 = 10; /// params for sealing a new block -pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi, E, TP, CIDP> { +pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi, E, TP, CIDP, P> { /// if true, empty blocks(without extrinsics) will be created. /// otherwise, will return Error::EmptyTransactionPool. pub create_empty: bool, @@ -56,7 +56,7 @@ pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi, E, TP pub select_chain: &'a SC, /// Digest provider for inclusion in blocks. pub consensus_data_provider: - Option<&'a dyn ConsensusDataProvider>>, + Option<&'a dyn ConsensusDataProvider>>, /// block import object pub block_import: &'a mut BI, /// Something that can create the inherent data providers. @@ -64,7 +64,7 @@ pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi, E, TP } /// seals a new block with the given params -pub async fn seal_block( +pub async fn seal_block( SealBlockParams { create_empty, finalize, @@ -77,7 +77,7 @@ pub async fn seal_block( create_inherent_data_providers, consensus_data_provider: digest_provider, mut sender, - }: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP>, + }: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>, ) where B: BlockT, BI: BlockImport> @@ -91,6 +91,7 @@ pub async fn seal_block( SC: SelectChain, TransactionFor: 'static, CIDP: CreateInherentDataProviders, + P: Send + Sync + 'static, { let future = async { if pool.status().ready == 0 && !create_empty { @@ -102,7 +103,7 @@ pub async fn seal_block( // or fetch the best_block. let parent = match parent_hash { Some(hash) => client - .header(BlockId::Hash(hash))? + .header(hash)? .ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?, None => select_chain.best_chain().await?, }; @@ -112,7 +113,7 @@ pub async fn seal_block( .await .map_err(|e| Error::Other(e))?; - let inherent_data = inherent_data_providers.create_inherent_data()?; + let inherent_data = inherent_data_providers.create_inherent_data().await?; let proposer = env.init(&parent).map_err(|err| Error::StringError(err.to_string())).await?; let inherents_len = inherent_data.len(); @@ -139,6 +140,7 @@ pub async fn seal_block( let (header, body) = proposal.block.deconstruct(); let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone()); + let proof = proposal.proof; params.body = Some(body); params.finalized = finalize; params.fork_choice = Some(ForkChoiceStrategy::LongestChain); @@ -147,7 +149,7 @@ pub async fn seal_block( )); if let Some(digest_provider) = digest_provider { - digest_provider.append_block_import(&parent, &mut params, &inherent_data)?; + digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?; } // Make sure we return the same post-hash that will be calculated when importing the block @@ -155,7 +157,7 @@ pub async fn seal_block( let mut post_header = header.clone(); post_header.digest_mut().logs.extend(params.post_digests.iter().cloned()); - match block_import.import_block(params, HashMap::new()).await? { + match block_import.import_block(params).await? { ImportResult::Imported(aux) => Ok(CreatedBlock { hash: ::Header::hash(&post_header), aux }), other => Err(other.into()),