Skip to content

Commit

Permalink
[stateless_validation] Implement distribution of state witness parts (n…
Browse files Browse the repository at this point in the history
…ear#11115)

This PR implements the following
- Compressing state witness and creating Reed Solomon encoding parts out
of it.
- Sending the parts over the network to the appropriate part owners.
- Forwarding parts received by owners to other chunk validators.

Future PRs would handle the following
- Validation logic for the parts
- Part management and reconstruction of state witness from parts.
- Add tests after end to end implementation
  • Loading branch information
Shreyan Gupta committed Apr 24, 2024
1 parent 02a315a commit 6aa1aad
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 28 deletions.
159 changes: 139 additions & 20 deletions chain/client/src/stateless_validation/state_witness_actions.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use near_async::messaging::CanSend;
use near_async::time::Clock;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_primitives::reed_solomon::ReedSolomonWrapper;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::{
ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, PartialEncodedStateWitness,
SignedEncodedChunkStateWitness,
};
use near_primitives::types::{AccountId, EpochId};
use near_primitives::validator_signer::ValidatorSigner;

use crate::metrics;
Expand All @@ -25,6 +30,9 @@ pub struct StateWitnessActions {
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
/// Reed Solomon encoder for encoding state witness parts.
/// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder.
rs_map: HashMap<usize, ReedSolomonWrapper>,
}

impl StateWitnessActions {
Expand All @@ -39,48 +47,125 @@ impl StateWitnessActions {
my_signer,
epoch_manager,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
rs_map: HashMap::new(),
}
}

pub fn handle_distribute_state_witness_request(
&mut self,
msg: DistributeStateWitnessRequest,
) -> Result<(), Error> {
let DistributeStateWitnessRequest { state_witness } = msg;
let DistributeStateWitnessRequest { epoch_id, chunk_header, state_witness } = msg;

let signed_witness = create_signed_witness(&state_witness, self.my_signer.as_ref())?;

let mut chunk_validators = self
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&state_witness.epoch_id,
state_witness.chunk_header.shard_id(),
state_witness.chunk_header.height_created(),
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "stateless_validation",
"Sending chunk state witness for chunk {:?} to chunk validators {:?}",
state_witness.chunk_header.chunk_hash(),
chunk_header.chunk_hash(),
chunk_validators,
);

let witness_bytes = compress_witness(&state_witness)?;

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
&state_witness,
signed_witness.witness_bytes.size_bytes(),
witness_bytes.size_bytes(),
chunk_validators.len(),
);

// TODO(stateless_validation): Replace with call to send_state_witness_parts after full implementation
self.send_state_witness(witness_bytes, chunk_validators);

Ok(())
}

// TODO(stateless_validation): Deprecate once we send state witness in parts.
// This is the original way of sending out state witness where the chunk producer sends the whole witness
// to all chunk validators.
fn send_state_witness(
&self,
witness_bytes: EncodedChunkStateWitness,
mut chunk_validators: Vec<AccountId>,
) {
// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != self.my_signer.validator_id());

let signed_witness = SignedEncodedChunkStateWitness {
signature: self.my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators, signed_witness),
));
}

// Break the state witness into parts and send each part to the corresponding chunk validator owner.
// The chunk validator owner will then forward the part to all other chunk validators.
// Each chunk validator would collect the parts and reconstruct the state witness.
#[allow(unused)]
fn send_state_witness_parts(
&mut self,
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: Vec<AccountId>,
) -> Result<(), Error> {
// Break the state witness into parts using Reed Solomon encoding.
let rs = self.rs_map.entry(chunk_validators.len()).or_insert_with(|| {
let total_parts = chunk_validators.len();
let data_parts = std::cmp::max(total_parts * 2 / 3, 1);
ReedSolomonWrapper::new(data_parts, total_parts - data_parts)
});
let (parts, encoded_length) = rs.encode(witness_bytes);

let validator_witness_tuple = chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
epoch_id.clone(),
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
encoded_length,
self.my_signer.as_ref(),
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec();

// Since we can't send network message to ourselves, we need to send the PartialEncodedStateWitnessForward
// message for our part.
if let Some((_, partial_witness)) = validator_witness_tuple
.iter()
.find(|(validator, _)| validator == self.my_signer.validator_id())
{
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
chunk_validators,
partial_witness.clone(),
),
));
}

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
Ok(())
}

Expand All @@ -98,37 +183,71 @@ impl StateWitnessActions {
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
// Validate the partial encoded state witness.
self.validate_partial_encoded_state_witness(&partial_witness)?;

// Store the partial encoded state witness for self.
self.store_partial_encoded_state_witness(&partial_witness)?;

// Forward the part to all the chunk validators.
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
partial_witness.epoch_id(),
partial_witness.chunk_header().shard_id(),
partial_witness.chunk_header().height_created(),
)?
.ordered_chunk_validators();

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness),
));

Ok(())
}

/// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer.
pub fn handle_partial_encoded_state_witness_forward(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
// Validate the partial encoded state witness.
self.validate_partial_encoded_state_witness(&partial_witness)?;

// Store the partial encoded state witness for self.
self.store_partial_encoded_state_witness(&partial_witness)?;

Ok(())
}

fn validate_partial_encoded_state_witness(
&self,
partial_witness: &PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
}

fn store_partial_encoded_state_witness(
&self,
partial_witness: &PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
}
}

fn create_signed_witness(
witness: &ChunkStateWitness,
my_signer: &dyn ValidatorSigner,
) -> Result<SignedEncodedChunkStateWitness, Error> {
fn compress_witness(witness: &ChunkStateWitness) -> Result<EncodedChunkStateWitness, Error> {
let shard_id_label = witness.chunk_header.shard_id().to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(&witness)?;
encode_timer.observe_duration();
let signed_witness = SignedEncodedChunkStateWitness {
signature: my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};

metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(signed_witness.witness_bytes.size_bytes() as f64);
.observe(witness_bytes.size_bytes() as f64);
metrics::CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(raw_witness_size as f64);
Ok(signed_witness)
Ok(witness_bytes)
}
4 changes: 4 additions & 0 deletions chain/client/src/stateless_validation/state_witness_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use near_network::state_witness::{
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::EpochId;
use near_primitives::validator_signer::ValidatorSigner;

use super::state_witness_actions::StateWitnessActions;
Expand Down Expand Up @@ -43,6 +45,8 @@ impl actix::Actor for StateWitnessActor {
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct DistributeStateWitnessRequest {
pub epoch_id: EpochId,
pub chunk_header: ShardChunkHeader,
pub state_witness: ChunkStateWitness,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ impl Client {
);
}

self.state_witness_adapter.send(DistributeStateWitnessRequest { state_witness });
self.state_witness_adapter.send(DistributeStateWitnessRequest {
epoch_id: epoch_id.clone(),
chunk_header,
state_witness,
});
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl PeerConfig {
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl actix::Handler<WithNetworkState> for PeerManagerActor {
}

#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ pub enum NetworkRequests {
TxStatus(AccountId, AccountId, CryptoHash),
/// A challenge to invalidate a block.
Challenge(Challenge),
/// TODO(stateless_validation): Deprecate once we send state witness in parts.
/// A chunk's state witness.
ChunkStateWitness(Vec<AccountId>, SignedEncodedChunkStateWitness),
/// Acknowledgement to a chunk's state witness, sent back to the originating chunk producer.
Expand Down
Loading

0 comments on commit 6aa1aad

Please sign in to comment.