Skip to content

Commit

Permalink
Merge branch 'leon/batch_refactor' into 'master'
Browse files Browse the repository at this point in the history
chore(CON-960): Use BatchMessages instead of BatchPayload in batch delivery

This MR replaces the `BatchPayload` struct in `Batch` with `BatchMessages`.

`BatchMessages` are already parsed into the underlying message type, such as `Vec<SignedIngress>`.
Furthermore, `BatchPayload` was containing `canister_http` messages, which were not used by message routing at all, rather they were parsed into `Vec<Response>` by `batch_delivery.rs`. 

See merge request dfinity-lab/public/ic!11130
  • Loading branch information
Sawchord committed Mar 20, 2023
2 parents cfc4f05 + 9d2ef39 commit f28a3e0
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 83 deletions.
20 changes: 13 additions & 7 deletions rs/consensus/src/consensus/batch_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,27 @@ pub fn deliver_batches(
// This flag can only be true, if we've called deliver_batches with a height
// limit. In this case we also want to have a checkpoint for that last height.
let persist_batch = Some(h) == max_batch_height_to_deliver;
let requires_full_state_hash = block.payload.is_summary() || persist_batch;
let (batch_messages, batch_stats) = if block.payload.is_summary() {
(BatchMessages::default(), BatchStats::empty(h))
} else {
let batch_payload = BlockPayload::from(block.payload).into_data().batch;
let batch_stats = BatchStats::from_payload(h, &batch_payload);
let batch_messages = batch_payload.into_messages().unwrap();
(batch_messages, batch_stats)
};

let batch = Batch {
batch_number: h,
requires_full_state_hash: block.payload.is_summary() || persist_batch,
payload: if block.payload.is_summary() {
BatchPayload::default()
} else {
BlockPayload::from(block.payload).into_data().batch
},
requires_full_state_hash,
messages: batch_messages,
randomness,
ecdsa_subnet_public_keys: ecdsa_subnet_public_key.into_iter().collect(),
registry_version: block.context.registry_version,
time: block.context.time,
consensus_responses,
};
let batch_stats = BatchStats::from(&batch);

debug!(
log,
"replica {:?} delivered batch {:?} for block_hash {:?}",
Expand Down
36 changes: 20 additions & 16 deletions rs/consensus/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use ic_metrics::{
MetricsRegistry,
};
use ic_types::{
batch::Batch,
batch::BatchPayload,
consensus::{
ecdsa::{CompletedReshareRequest, CompletedSignature, EcdsaPayload, KeyTranscriptCreation},
Block, BlockProposal, ConsensusMessageHashable, HasHeight, HasRank,
},
CountBytes,
CountBytes, Height,
};
use prometheus::{
GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Expand Down Expand Up @@ -145,6 +145,7 @@ impl From<&Block> for BlockStats {
}

// Batch payload stats
#[derive(Debug, Default)]
pub struct BatchStats {
pub batch_height: u64,
pub ingress_messages_delivered: usize,
Expand All @@ -156,21 +157,24 @@ pub struct BatchStats {
pub canister_http_divergences_delivered: usize,
}

impl From<&Batch> for BatchStats {
fn from(batch: &Batch) -> Self {
impl BatchStats {
pub(crate) fn from_payload(batch_height: Height, payload: &BatchPayload) -> Self {
Self {
batch_height: batch.batch_number.get(),
ingress_messages_delivered: batch.payload.ingress.message_count(),
ingress_message_bytes_delivered: batch.payload.ingress.count_bytes(),
xnet_bytes_delivered: batch.payload.xnet.size_bytes(),
ingress_ids: batch.payload.ingress.message_ids(),
canister_http_success_delivered: batch.payload.canister_http.responses.len(),
canister_http_timeouts_delivered: batch.payload.canister_http.timeouts.len(),
canister_http_divergences_delivered: batch
.payload
.canister_http
.divergence_responses
.len(),
batch_height: batch_height.get(),
ingress_messages_delivered: payload.ingress.message_count(),
ingress_message_bytes_delivered: payload.ingress.count_bytes(),
xnet_bytes_delivered: payload.xnet.size_bytes(),
ingress_ids: payload.ingress.message_ids(),
canister_http_success_delivered: payload.canister_http.responses.len(),
canister_http_timeouts_delivered: payload.canister_http.timeouts.len(),
canister_http_divergences_delivered: payload.canister_http.divergence_responses.len(),
}
}

pub(crate) fn empty(batch_height: Height) -> Self {
Self {
batch_height: batch_height.get(),
..Self::default()
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions rs/consensus/tests/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use ic_types::{
crypto::CryptoHash, malicious_flags::MaliciousFlags, replica_config::ReplicaConfig,
CryptoHashOfState, Height,
};
use std::convert::TryInto;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

Expand Down Expand Up @@ -183,9 +182,9 @@ fn consensus_produces_expected_batches() {
*router.batches.write().unwrap() = Vec::new();
assert_eq!(batches.len(), 2);
assert_ne!(batches[0].batch_number, batches[1].batch_number);
let mut msgs: Vec<_> = batches[0].payload.ingress.clone().try_into().unwrap();
let mut msgs: Vec<_> = batches[0].messages.signed_ingress_msgs.clone();
assert_eq!(msgs.pop(), Some(ingress0));
let mut msgs: Vec<_> = batches[1].payload.ingress.clone().try_into().unwrap();
let mut msgs: Vec<_> = batches[1].messages.signed_ingress_msgs.clone();
assert_eq!(msgs.pop(), Some(ingress1));
})
}
13 changes: 5 additions & 8 deletions rs/determinism_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ic_state_manager::StateManagerImpl;
use ic_test_utilities::types::messages::SignedIngressBuilder;
use ic_types::{
artifact::SignedIngress,
batch::{Batch, BatchPayload, IngressPayload},
batch::{Batch, BatchMessages},
ingress::{IngressState, IngressStatus, WasmResult},
messages::MessageId,
time::UNIX_EPOCH,
Expand All @@ -27,9 +27,9 @@ fn build_batch(message_routing: &dyn MessageRouting, msgs: Vec<SignedIngress>) -
Batch {
batch_number: message_routing.expected_batch_height(),
requires_full_state_hash: false,
payload: BatchPayload {
ingress: IngressPayload::from(msgs),
..BatchPayload::default()
messages: BatchMessages {
signed_ingress_msgs: msgs,
..BatchMessages::default()
},
randomness: Randomness::from([0; 32]),
ecdsa_subnet_public_keys: BTreeMap::new(),
Expand All @@ -43,10 +43,7 @@ fn build_batch_with_full_state_hash(message_routing: &dyn MessageRouting) -> Bat
Batch {
batch_number: message_routing.expected_batch_height(),
requires_full_state_hash: true,
payload: BatchPayload {
ingress: IngressPayload::from(vec![]),
..BatchPayload::default()
},
messages: BatchMessages::default(),
randomness: Randomness::from([0; 32]),
ecdsa_subnet_public_keys: BTreeMap::new(),
registry_version: RegistryVersion::from(1),
Expand Down
9 changes: 5 additions & 4 deletions rs/drun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use ic_test_utilities::consensus::fake::FakeVerifier;
use ic_test_utilities_registry::{
add_subnet_record, insert_initial_dkg_transcript, SubnetRecordBuilder,
};
use ic_types::batch::BatchMessages;
use ic_types::malicious_flags::MaliciousFlags;
use ic_types::{
batch::{Batch, BatchPayload, IngressPayload},
batch::Batch,
ingress::{IngressState, IngressStatus, WasmResult},
messages::{MessageId, SignedIngress},
replica_config::ReplicaConfig,
Expand Down Expand Up @@ -323,9 +324,9 @@ fn build_batch(message_routing: &dyn MessageRouting, msgs: Vec<SignedIngress>) -
Batch {
batch_number: message_routing.expected_batch_height(),
requires_full_state_hash: !msgs.is_empty(),
payload: BatchPayload {
ingress: IngressPayload::from(msgs),
..BatchPayload::default()
messages: BatchMessages {
signed_ingress_msgs: msgs,
..BatchMessages::default()
},
randomness: Randomness::from(get_random_seed()),
ecdsa_subnet_public_keys: BTreeMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion rs/messaging/src/message_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ impl BatchProcessor for FakeBatchProcessorImpl {
state.metadata.batch_time = time;

// Get only ingress out of the batch_messages
let signed_ingress_msgs = batch.payload.into_messages().unwrap().signed_ingress_msgs;
let signed_ingress_msgs = batch.messages.signed_ingress_msgs;

// Treat all ingress messages as already executed.
let all_ingress_execution_results = signed_ingress_msgs.into_iter().map(|ingress| {
Expand Down
17 changes: 7 additions & 10 deletions rs/messaging/src/routing/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{routing::stream_handler::StreamHandler, scheduling::valid_set_rule::
use ic_interfaces_certified_stream_store::CertifiedStreamStore;
use ic_logger::{debug, trace, ReplicaLogger};
use ic_replicated_state::ReplicatedState;
use ic_types::{batch::BatchPayload, messages::SignedIngressContent};
use ic_types::{batch::BatchMessages, messages::SignedIngressContent};
use std::sync::Arc;

#[cfg(test)]
Expand All @@ -14,7 +14,7 @@ pub(crate) trait Demux: Send {
/// Process the provided payload. Splices off XNetMessages as appropriate
/// and (attempts) to induct the messages contained in the payload as
/// appropriate.
fn process_payload(&self, state: ReplicatedState, payload: BatchPayload) -> ReplicatedState;
fn process_payload(&self, state: ReplicatedState, messages: BatchMessages) -> ReplicatedState;
}

pub(crate) struct DemuxImpl<'a> {
Expand All @@ -41,16 +41,13 @@ impl<'a> DemuxImpl<'a> {
}

impl<'a> Demux for DemuxImpl<'a> {
fn process_payload(&self, state: ReplicatedState, payload: BatchPayload) -> ReplicatedState {
fn process_payload(
&self,
state: ReplicatedState,
batch_messages: BatchMessages,
) -> ReplicatedState {
trace!(self.log, "Processing Payload");

let batch_messages = payload.into_messages().unwrap_or_else(|err| {
unreachable!(
"Failed to retrieve messages from validated batch payload: {:?}",
err
)
});

let mut decoded_slices = BTreeMap::new();
for (subnet_id, certified_slice) in batch_messages.certified_stream_slices {
let slice = self
Expand Down
2 changes: 1 addition & 1 deletion rs/messaging/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl StateMachine for StateMachineImpl {

// Preprocess messages and add messages to the induction pool through the Demux.
let phase_timer = Timer::start();
let mut state_with_messages = self.demux.process_payload(state, batch.payload);
let mut state_with_messages = self.demux.process_payload(state, batch.messages);

// Append additional responses to the consensus queue.
state_with_messages
Expand Down
20 changes: 7 additions & 13 deletions rs/messaging/src/state_machine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ use ic_registry_subnet_features::SubnetFeatures;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{ReplicatedState, SubnetTopology};
use ic_test_utilities::{
state_manager::FakeStateManager,
types::batch::{BatchBuilder, IngressPayloadBuilder, PayloadBuilder},
types::ids::subnet_test_id,
state_manager::FakeStateManager, types::batch::BatchBuilder, types::ids::subnet_test_id,
types::messages::SignedIngressBuilder,
};
use ic_test_utilities_execution_environment::test_registry_settings;
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::crypto::canister_threshold_sig::MasterEcdsaPublicKey;
use ic_types::messages::SignedIngress;
use ic_types::{batch::BatchMessages, crypto::canister_threshold_sig::MasterEcdsaPublicKey};
use ic_types::{Height, PrincipalId, SubnetId};
use mockall::{mock, predicate::*, Sequence};
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -74,7 +72,7 @@ fn test_fixture(provided_batch: &Batch) -> StateMachineTestFixture {
.expect_process_payload()
.times(1)
.in_sequence(&mut seq)
.with(always(), eq(provided_batch.payload.clone()))
.with(always(), eq(provided_batch.messages.clone()))
.returning(|state, _| state);

let mut scheduler = Box::new(MockScheduler::new());
Expand Down Expand Up @@ -203,16 +201,12 @@ fn param_batch_test(batch_num: Height, in_count: u64) {
ingress_messages.push(in_msg);
}

let ingress_payload_builder = IngressPayloadBuilder::new();
let payload_builder = PayloadBuilder::new();
let batch_builder = BatchBuilder::new();

let provided_batch = batch_builder
.payload(
payload_builder
.ingress(ingress_payload_builder.msgs(ingress_messages).build())
.build(),
)
.messages(BatchMessages {
signed_ingress_msgs: ingress_messages,
..BatchMessages::default()
})
.batch_number(batch_num)
.build();

Expand Down
15 changes: 7 additions & 8 deletions rs/replay/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ use ic_registry_transport::{
use ic_replica::setup::get_subnet_type;
use ic_replicated_state::ReplicatedState;
use ic_state_manager::StateManagerImpl;
use ic_types::batch::BatchMessages;
use ic_types::consensus::certification::CertificationShare;
use ic_types::malicious_flags::MaliciousFlags;
use ic_types::{
batch::{Batch, BatchPayload, IngressPayload},
batch::Batch,
consensus::{catchup::CUPWithOriginalProtobuf, CatchUpPackage, HasHeight, HasVersion},
ingress::{IngressState, IngressStatus, WasmResult},
messages::UserQuery,
Expand Down Expand Up @@ -714,7 +715,7 @@ impl Player {
let mut extra_batch = Batch {
batch_number,
requires_full_state_hash: true,
payload: BatchPayload::default(),
messages: BatchMessages::default(),
// Use a fake randomness here since we don't have random tape for extra messages
randomness,
ecdsa_subnet_public_keys: BTreeMap::new(),
Expand All @@ -728,12 +729,10 @@ impl Player {
return (context_time, None);
}
if !extra_msgs.is_empty() {
extra_batch.payload.ingress = IngressPayload::from(
extra_msgs
.iter()
.map(|fm| fm.ingress.clone())
.collect::<Vec<_>>(),
);
extra_batch.messages.signed_ingress_msgs = extra_msgs
.iter()
.map(|fm| fm.ingress.clone())
.collect::<Vec<_>>();
println!("extra_batch created with new ingress");
}
let batch_number = extra_batch.batch_number;
Expand Down
8 changes: 4 additions & 4 deletions rs/replica/benches/execution_only_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ic_test_utilities::{
consensus::fake::FakeVerifier, mock_time, types::messages::SignedIngressBuilder,
};
use ic_types::{
batch::{Batch, BatchPayload, IngressPayload},
batch::{Batch, BatchMessages},
ingress::{IngressState, IngressStatus, WasmResult},
malicious_flags::MaliciousFlags,
messages::SignedIngress,
Expand Down Expand Up @@ -111,9 +111,9 @@ fn build_batch(message_routing: &dyn MessageRouting, msgs: Vec<SignedIngress>) -
Batch {
batch_number: message_routing.expected_batch_height(),
requires_full_state_hash: !msgs.is_empty(),
payload: BatchPayload {
ingress: IngressPayload::from(msgs),
..BatchPayload::default()
messages: BatchMessages {
signed_ingress_msgs: msgs,
..BatchMessages::default()
},
randomness: Randomness::from([0; 32]),
ecdsa_subnet_public_keys: BTreeMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion rs/state_machine_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ impl StateMachine {
let batch = Batch {
batch_number,
requires_full_state_hash: self.checkpoints_enabled.get(),
payload,
messages: payload.into_messages().unwrap(),
randomness: Randomness::from(seed),
ecdsa_subnet_public_keys: self.ecdsa_subnet_public_keys.clone(),
registry_version: self.registry_client.get_latest_version(),
Expand Down
10 changes: 5 additions & 5 deletions rs/test_utilities/src/types/batch/batch_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use crate::util::mock_time;
use ic_types::{
batch::{Batch, BatchPayload},
batch::{Batch, BatchMessages},
Height, Randomness, RegistryVersion, Time,
};

Expand All @@ -17,7 +17,7 @@ impl Default for BatchBuilder {
batch: Batch {
batch_number: Height::from(0),
requires_full_state_hash: false,
payload: super::payload::PayloadBuilder::default().build(),
messages: BatchMessages::default(),
randomness: Randomness::from([0; 32]),
ecdsa_subnet_public_keys: BTreeMap::new(),
registry_version: RegistryVersion::from(1),
Expand All @@ -40,9 +40,9 @@ impl BatchBuilder {
self
}

/// Set the payload field to payload.
pub fn payload(mut self, payload: BatchPayload) -> Self {
self.batch.payload = payload;
/// Set the messages field to messages.
pub fn messages(mut self, messages: BatchMessages) -> Self {
self.batch.messages = messages;
self
}

Expand Down

0 comments on commit f28a3e0

Please sign in to comment.