Skip to content

Commit

Permalink
partial-updates consensus-transition
Browse files Browse the repository at this point in the history
  • Loading branch information
Raj-RR1 committed Jul 18, 2023
1 parent 96d10db commit 611418b
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 19 deletions.
12 changes: 11 additions & 1 deletion node/consensus-transition/manual-seal/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
//! This is suitable for a testing environment.

use futures::channel::{mpsc::SendError, oneshot};
use jsonrpsee::{
core::Error as JsonRpseeError,
types::error::{CallError, ErrorObject},
};
use sc_consensus::ImportResult;
use sp_blockchain::Error as BlockchainError;
use sp_consensus::Error as ConsensusError;
Expand Down Expand Up @@ -71,7 +75,7 @@ pub enum Error {
SendError(#[from] SendError),
/// Some other error.
#[error("Other error: {0}")]
Other(#[from] Box<dyn std::error::Error + Send>),
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
}

impl From<ImportResult> for Error {
Expand Down Expand Up @@ -111,3 +115,9 @@ impl From<Error> for jsonrpc_core::Error {
}
}
}

impl From<Error> for JsonRpseeError {
fn from(err: Error) -> Self {
CallError::Custom(ErrorObject::owned(err.to_code(), err.to_string(), None::<()>)).into()
}
}
4 changes: 2 additions & 2 deletions node/consensus-transition/manual-seal/src/finalize_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::rpc;
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use sp_runtime::{generic::BlockId, traits::Block as BlockT, Justification};
use sp_runtime::{traits::Block as BlockT, Justification};
use std::{marker::PhantomData, sync::Arc};

/// params for block finalization.
Expand All @@ -46,7 +46,7 @@ where
{
let FinalizeBlockParams { hash, mut sender, justification, finalizer, .. } = params;

match finalizer.finalize_block(BlockId::Hash(hash), justification, true) {
match finalizer.finalize_block(hash, justification, true) {
Err(e) => {
log::warn!("Failed to finalize block {}", e);
rpc::send_result(&mut sender, Err(e.into()))
Expand Down
121 changes: 114 additions & 7 deletions node/consensus-transition/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
}

/// Params required to start the instant sealing authorship task.
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP> {
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP, P> {
/// Block import instance for well. importing blocks.
pub block_import: BI,

Expand All @@ -103,14 +103,14 @@ pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, C

/// Digest provider for inclusion in blocks.
pub consensus_data_provider:
Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,
Option<Box<dyn ConsensusDataProvider<B, Proof=P, Transaction = TransactionFor<C, B>>>>,

/// Something that can create the inherent data providers.
pub create_inherent_data_providers: CIDP,
}

/// Params required to start the manual sealing authorship task.
pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CIDP> {
pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CIDP, P> {
/// Block import instance for well. importing blocks.
pub block_import: BI,

Expand All @@ -128,14 +128,26 @@ pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC,

/// Digest provider for inclusion in blocks.
pub consensus_data_provider:
Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,
Option<Box<dyn ConsensusDataProvider<B,Proof=P, Transaction = TransactionFor<C, B>>>>,

/// Something that can create the inherent data providers.
pub create_inherent_data_providers: CIDP,
}

/// Params required to start the delayed finalization task.
pub struct DelayedFinalizeParams<C, S> {
/// Block import instance.
pub client: Arc<C>,

/// Handle for spawning delayed finalization tasks.
pub spawn_handle: S,

/// The delay in seconds before a block is finalized.
pub delay_sec: u64,
}

/// Creates the background authorship task for the manual seal engine.
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP>(
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
mut block_import,
mut env,
Expand All @@ -161,6 +173,7 @@ pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP>(
TransactionFor<C, B>: 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: Send + Sync + 'static,
{
while let Some(command) = commands_stream.next().await {
match command {
Expand Down Expand Up @@ -198,7 +211,7 @@ pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP>(
/// runs the background authorship task for the instant seal engine.
/// instant-seal creates a new block for every transaction imported into
/// the transaction pool.
pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP>(
pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP, P>(
InstantSealParams {
block_import,
env,
Expand All @@ -207,7 +220,7 @@ pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP>(
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: InstantSealParams<B, BI, E, C, TP, SC, CIDP>,
}: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
Expand All @@ -222,6 +235,7 @@ pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP>(
TransactionFor<C, B>: 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: Send + Sync + 'static,
{
// instant-seal creates blocks as soon as transactions are imported
// into the transaction pool.
Expand All @@ -245,6 +259,99 @@ pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP>(
.await
}

/// Runs the background authorship task for the instant seal engine.
/// instant-seal creates a new block for every transaction imported into
/// the transaction pool.
///
/// This function will finalize the block immediately as well. If you don't
/// want this behavior use `run_instant_seal` instead.
pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
InstantSealParams {
block_import,
env,
client,
pool,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send
+ Sync
+ 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P, Transaction = TransactionFor<C, B>>,
SC: SelectChain<B> + 'static,
TransactionFor<C, B>: 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: Send + Sync + 'static,
{
// Creates and finalizes blocks as soon as transactions are imported
// into the transaction pool.
let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
create_empty: false,
finalize: true,
parent_hash: None,
sender: None,
});

run_manual_seal(ManualSealParams {
block_import,
env,
client,
pool,
commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
})
.await
}

/// Creates a future for delayed finalization of manual sealed blocks.
///
/// The future needs to be spawned in the background alongside the
/// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that
/// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly
/// after building them. This also means that delayed finality can not be used with
/// [`run_instant_seal_and_finalize`].
pub async fn run_delayed_finalize<B, CB, C, S>(
DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
) where
B: BlockT + 'static,
CB: ClientBackend<B> + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SpawnNamed,
{
let mut block_import_stream = client.import_notification_stream();

while let Some(notification) = block_import_stream.next().await {
let delay = Delay::new(Duration::from_secs(delay_sec));
let cloned_client = client.clone();
spawn_handle.spawn(
"delayed-finalize",
None,
Box::pin(async move {
delay.await;
finalize_block(FinalizeBlockParams {
hash: notification.hash,
sender: None,
justification: None,
finalizer: cloned_client,
_phantom: PhantomData,
})
.await
}),
);
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 5 additions & 1 deletion node/consensus-transition/manual-seal/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ use futures::{
channel::{mpsc, oneshot},
FutureExt, SinkExt, TryFutureExt,
};
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
proc_macros::rpc,
};
use jsonrpc_core::Error;
use jsonrpc_derive::rpc;
use sc_consensus::ImportedAux;
use serde::{Deserialize, Serialize};
use sp_runtime::EncodedJustification;

/// Future's type for jsonrpc
type FutureResult<T> = jsonrpc_core::BoxFuture<Result<T, Error>>;
//type FutureResult<T> = jsonrpc_core::BoxFuture<Result<T, Error>>;
/// sender passed to the authorship task to report errors or successes.
pub type Sender<T> = Option<oneshot::Sender<std::result::Result<T, crate::Error>>>;

Expand Down
18 changes: 10 additions & 8 deletions node/consensus-transition/manual-seal/src/seal_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
pub const MAX_PROPOSAL_DURATION: u64 = 10;

/// params for sealing a new block
pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi<B>, E, TP, CIDP> {
pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi<B>, E, TP, CIDP, P> {
/// if true, empty blocks(without extrinsics) will be created.
/// otherwise, will return Error::EmptyTransactionPool.
pub create_empty: bool,
Expand All @@ -56,15 +56,15 @@ pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi<B>, E, TP
pub select_chain: &'a SC,
/// Digest provider for inclusion in blocks.
pub consensus_data_provider:
Option<&'a dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>,
Option<&'a dyn ConsensusDataProvider<B, Proof = P, Transaction = TransactionFor<C, B>>>,
/// block import object
pub block_import: &'a mut BI,
/// Something that can create the inherent data providers.
pub create_inherent_data_providers: &'a CIDP,
}

/// seals a new block with the given params
pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
pub async fn seal_block<B, BI, SC, C, E, TP, CIDP, P>(
SealBlockParams {
create_empty,
finalize,
Expand All @@ -77,7 +77,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
create_inherent_data_providers,
consensus_data_provider: digest_provider,
mut sender,
}: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP>,
}: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>,
) where
B: BlockT,
BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
Expand All @@ -91,6 +91,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
SC: SelectChain<B>,
TransactionFor<C, B>: 'static,
CIDP: CreateInherentDataProviders<B, ()>,
P: Send + Sync + 'static,
{
let future = async {
if pool.status().ready == 0 && !create_empty {
Expand All @@ -102,7 +103,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
// or fetch the best_block.
let parent = match parent_hash {
Some(hash) => client
.header(BlockId::Hash(hash))?
.header(hash)?
.ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?,
None => select_chain.best_chain().await?,
};
Expand All @@ -112,7 +113,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
.await
.map_err(|e| Error::Other(e))?;

let inherent_data = inherent_data_providers.create_inherent_data()?;
let inherent_data = inherent_data_providers.create_inherent_data().await?;

let proposer = env.init(&parent).map_err(|err| Error::StringError(err.to_string())).await?;
let inherents_len = inherent_data.len();
Expand All @@ -139,6 +140,7 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(

let (header, body) = proposal.block.deconstruct();
let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone());
let proof = proposal.proof;
params.body = Some(body);
params.finalized = finalize;
params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
Expand All @@ -147,15 +149,15 @@ pub async fn seal_block<B, BI, SC, C, E, TP, CIDP>(
));

if let Some(digest_provider) = digest_provider {
digest_provider.append_block_import(&parent, &mut params, &inherent_data)?;
digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?;
}

// Make sure we return the same post-hash that will be calculated when importing the block
// This is important in case the digest_provider added any signature, seal, ect.
let mut post_header = header.clone();
post_header.digest_mut().logs.extend(params.post_digests.iter().cloned());

match block_import.import_block(params, HashMap::new()).await? {
match block_import.import_block(params).await? {
ImportResult::Imported(aux) =>
Ok(CreatedBlock { hash: <B as BlockT>::Header::hash(&post_header), aux }),
other => Err(other.into()),
Expand Down

0 comments on commit 611418b

Please sign in to comment.