Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions crates/malachite-app/src/handlers/consensus_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,28 @@ pub async fn handle(
engine: &Engine,
reply: Reply<(Height, HeightParams<ArcContext>)>,
) -> eyre::Result<()> {
// Create and attach the persistence meter before borrowing state fields,
// since set_persistence_meter requires &mut self.
{
let execution_config = &state.config().execution;
let meter = persistence_meter::create_with_fallback(
execution_config.persistence_backpressure,
engine.subscription_endpoint(),
execution_config.persistence_backpressure_threshold,
)
.await;

persistence_meter::seed_from_latest_block(meter.as_ref(), engine.eth.as_ref()).await;

state.set_persistence_meter(meter);
}

let (store, stats, metrics) = (state.store(), state.stats(), state.metrics());
let max_pending_proposals = max_pending_proposals(&state.config().value_sync);

let payload_validator = EnginePayloadValidator::new(engine, metrics);
let block_finalizer = EngineBlockFinalizer::new(engine, stats, metrics);

let execution_config = &state.config().execution;
let persistence_meter = persistence_meter::create_with_fallback(
execution_config.persistence_backpressure,
engine.subscription_endpoint(),
execution_config.persistence_backpressure_threshold,
)
.await;

// Seed with the current EL height. Safe at startup since all blocks are
// persisted before the node begins replaying.
persistence_meter::seed_from_latest_block(persistence_meter.as_ref(), engine.eth.as_ref())
.await;

let (next_height, next_validator_set, next_consensus_params, previous_block) =
on_consensus_ready(
metrics,
Expand All @@ -76,7 +79,7 @@ pub async fn handle(
block_finalizer,
engine.api.as_ref(),
engine.eth.as_ref(),
persistence_meter.as_ref(),
state.persistence_meter(),
max_pending_proposals,
)
.await?;
Expand Down
162 changes: 162 additions & 0 deletions crates/malachite-app/src/handlers/process_synced_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use bytes::Bytes;
use eyre::Context;
use ssz::Decode;
Expand All @@ -26,6 +28,7 @@ use malachitebft_app_channel::Reply;
use alloy_rpc_types_engine::ExecutionPayloadV3;
use arc_consensus_types::{Address, ArcContext, Height};
use arc_eth_engine::engine::Engine;
use arc_eth_engine::persistence_meter::PersistenceMeter;

use malachitebft_app_channel::app::types::core::Validity;

Expand All @@ -35,6 +38,9 @@ use crate::state::State;
use crate::store::repositories::{InvalidPayloadsRepository, UndecidedBlocksRepository};
use arc_consensus_db::invalid_payloads::InvalidPayload;

/// Timeout when blocked waiting for EL persistence to catch up during sync.
const SYNC_PERSISTENCE_WAIT_TIMEOUT: Duration = Duration::from_secs(30);

/// Handles the `ProcessSyncedValue` message from the consensus engine.
///
/// This is called when the consensus engine has received a value via sync for a given height and round.
Expand All @@ -56,6 +62,7 @@ pub async fn handle(
EnginePayloadValidator::new(engine, state.metrics()),
state.store(),
state.store(),
state.persistence_meter(),
height,
round,
proposer,
Expand Down Expand Up @@ -98,10 +105,12 @@ pub async fn handle(
///
/// Returns `Ok(None)` when the raw bytes cannot be SSZ-decoded (the error is logged
/// but not propagated).
#[allow(clippy::too_many_arguments)]
async fn on_process_synced_value(
engine: impl PayloadValidator,
undecided_blocks_repo: impl UndecidedBlocksRepository,
invalid_payloads_repo: impl InvalidPayloadsRepository,
persistence_meter: impl PersistenceMeter,
height: Height,
round: Round,
proposer: Address,
Expand Down Expand Up @@ -169,6 +178,19 @@ async fn on_process_synced_value(
)
})?;

if validity.is_valid() {
if let Err(e) = persistence_meter
.wait_for_persisted_block(height.as_u64(), SYNC_PERSISTENCE_WAIT_TIMEOUT)
.await
{
error!(
block_number = height.as_u64(),
%e,
"ProcessSyncedValue: persistence backpressure timed out, proceeding"
);
}
}

Ok(Some(proposal))
}

Expand All @@ -182,6 +204,8 @@ mod tests {
};

use arbitrary::{Arbitrary, Unstructured};
use arc_eth_engine::mocks::MockPersistenceMeter;
use arc_eth_engine::persistence_meter::NoopPersistenceMeter;
use bytes::Bytes;
use malachitebft_core_types::Validity;
use mockall::predicate::*;
Expand Down Expand Up @@ -226,6 +250,7 @@ mod tests {
engine,
undecided,
invalid,
NoopPersistenceMeter,
height,
round,
proposer,
Expand Down Expand Up @@ -277,6 +302,7 @@ mod tests {
engine,
undecided,
invalid,
NoopPersistenceMeter,
height,
round,
proposer,
Expand Down Expand Up @@ -317,6 +343,7 @@ mod tests {
engine,
undecided,
invalid,
NoopPersistenceMeter,
height,
round,
proposer,
Expand Down Expand Up @@ -350,6 +377,7 @@ mod tests {
engine,
undecided,
invalid,
NoopPersistenceMeter,
height,
round,
proposer,
Expand Down Expand Up @@ -388,6 +416,7 @@ mod tests {
engine,
undecided,
invalid,
NoopPersistenceMeter,
height,
round,
proposer,
Expand All @@ -398,4 +427,137 @@ mod tests {
assert!(result.is_err());
assert!(result.unwrap_err().downcast_ref::<io::Error>().is_some());
}

#[tokio::test]
async fn on_process_synced_value_calls_persistence_meter_for_valid_payload() {
let mut u = Unstructured::new(&[0u8; 512]);

let height = Height::new(42);
let round = Round::new(0);
let proposer = Address::new([0u8; 20]);
let payload = ExecutionPayloadV3::arbitrary(&mut u).unwrap();
let value_bytes = Bytes::from(payload.as_ssz_bytes());

let mut engine = MockPayloadValidator::new();
engine
.expect_validate_payload()
.returning(|_| Ok(PayloadValidationResult::Valid));

let mut undecided = MockUndecidedBlocksRepository::new();
undecided.expect_store().times(1).returning(|_| Ok(()));

let mut invalid = MockInvalidPayloadsRepository::new();
invalid.expect_append().times(0);

let mut persistence_meter = MockPersistenceMeter::new();
persistence_meter
.expect_wait_for_persisted_block()
.withf(|&block, _| block == 42)
.times(1)
.return_once(|_, _| Ok(()));

let proposal = on_process_synced_value(
engine,
undecided,
invalid,
persistence_meter,
height,
round,
proposer,
value_bytes,
)
.await
.expect("should succeed");

assert!(proposal.is_some());
assert_eq!(proposal.unwrap().validity, Validity::Valid);
}

#[tokio::test]
async fn on_process_synced_value_skips_persistence_meter_for_invalid_payload() {
let mut u = Unstructured::new(&[0u8; 512]);

let height = Height::new(42);
let round = Round::new(0);
let proposer = Address::new([0u8; 20]);
let payload = ExecutionPayloadV3::arbitrary(&mut u).unwrap();
let value_bytes = Bytes::from(payload.as_ssz_bytes());

let mut engine = MockPayloadValidator::new();
engine.expect_validate_payload().returning(|_| {
Ok(PayloadValidationResult::Invalid {
reason: "bad".into(),
})
});

let mut undecided = MockUndecidedBlocksRepository::new();
undecided.expect_store().times(1).returning(|_| Ok(()));

let mut invalid = MockInvalidPayloadsRepository::new();
invalid.expect_append().times(1).returning(|_| Ok(()));

let mut persistence_meter = MockPersistenceMeter::new();
persistence_meter.expect_wait_for_persisted_block().times(0);

let proposal = on_process_synced_value(
engine,
undecided,
invalid,
persistence_meter,
height,
round,
proposer,
value_bytes,
)
.await
.expect("should succeed");

assert!(proposal.is_some());
assert_eq!(proposal.unwrap().validity, Validity::Invalid);
}

#[tokio::test]
async fn on_process_synced_value_proceeds_when_persistence_meter_fails() {
let mut u = Unstructured::new(&[0u8; 512]);

let height = Height::new(7);
let round = Round::new(0);
let proposer = Address::new([0u8; 20]);
let payload = ExecutionPayloadV3::arbitrary(&mut u).unwrap();
let value_bytes = Bytes::from(payload.as_ssz_bytes());

let mut engine = MockPayloadValidator::new();
engine
.expect_validate_payload()
.returning(|_| Ok(PayloadValidationResult::Valid));

let mut undecided = MockUndecidedBlocksRepository::new();
undecided.expect_store().times(1).returning(|_| Ok(()));

let mut invalid = MockInvalidPayloadsRepository::new();
invalid.expect_append().times(0);

let mut persistence_meter = MockPersistenceMeter::new();
persistence_meter
.expect_wait_for_persisted_block()
.withf(|&block, _| block == 7)
.times(1)
.return_once(|_, _| Err(eyre::eyre!("persistence meter timeout")));

let proposal = on_process_synced_value(
engine,
undecided,
invalid,
persistence_meter,
height,
round,
proposer,
value_bytes,
)
.await
.expect("should succeed even when meter fails");

assert!(proposal.is_some());
assert_eq!(proposal.unwrap().validity, Validity::Valid);
}
}
13 changes: 13 additions & 0 deletions crates/malachite-app/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arc_consensus_types::{
Address, AlloyAddress, ArcContext, BlockHash, Config, ConsensusParams, Height, ValidatorSet,
};
use arc_eth_engine::json_structures::ExecutionBlock;
use arc_eth_engine::persistence_meter::{NoopPersistenceMeter, PersistenceMeter};
use arc_signer::ArcSigningProvider;
use malachitebft_core_types::HeightParams;

Expand Down Expand Up @@ -135,6 +136,9 @@ pub struct State {
/// Timestamps of heights that received a synced value via ProcessSyncedValue.
synced_heights: HashMap<Height, SystemTime>,

/// Meters EL block persistence to apply backpressure during sync catch-up.
persistence_meter: Box<dyn PersistenceMeter>,

/// Consensus-layer chain spec (fork activation by height/time).
#[allow(dead_code)]
pub spec: ConsensusSpec,
Expand Down Expand Up @@ -199,6 +203,7 @@ impl State {
consensus_params: ConsensusParams::default(),
proposal_monitor: None,
synced_heights: HashMap::new(),
persistence_meter: Box::new(NoopPersistenceMeter),
spec,
metrics,
}
Expand Down Expand Up @@ -273,6 +278,14 @@ impl State {
self.consensus_params = consensus_params;
}

pub fn persistence_meter(&self) -> &dyn PersistenceMeter {
self.persistence_meter.as_ref()
}

pub fn set_persistence_meter(&mut self, meter: Box<dyn PersistenceMeter>) {
self.persistence_meter = meter;
}

/// Get mutable reference to the streams map
pub fn streams_map_mut(&mut self) -> &mut PartStreamsMap {
&mut self.streams_map
Expand Down
4 changes: 2 additions & 2 deletions crates/malachite-cli/src/cmd/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub struct StartCmd {
#[clap(
long = "execution-persistence-backpressure-threshold",
value_name = "BLOCKS",
default_value = "100"
default_value = "16"
)]
pub execution_persistence_backpressure_threshold: u64,

Expand Down Expand Up @@ -401,7 +401,7 @@ impl Default for StartCmd {
execution_endpoint: None,
execution_ws_endpoint: None,
execution_persistence_backpressure: false,
execution_persistence_backpressure_threshold: 100,
execution_persistence_backpressure_threshold: 16,
execution_jwt: None,
metrics: None,
rpc_addr: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub struct ExecutionConfig {

impl ExecutionConfig {
const fn default_persistence_backpressure_threshold() -> u64 {
100
16
}
}

Expand Down
Loading