Skip to content

Commit

Permalink
Merge pull request #3 from michaelsproul/bn-proposer-alt
Browse files Browse the repository at this point in the history
Fix closure lifetimes by using functions
  • Loading branch information
AgeManning committed Jul 18, 2022
2 parents fbf533b + 58de945 commit 55b3cfb
Showing 1 changed file with 116 additions and 103 deletions.
219 changes: 116 additions & 103 deletions validator_client/src/block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext;
use eth2::types::Graffiti;
use eth2::BeaconNodeHttpClient;
use slog::{crit, debug, error, info, trace, warn};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{
BlindedPayload, BlockType, Epoch, EthSpec, ExecPayload, FullPayload, PublicKeyBytes, Slot,
BeaconBlock, BlindedPayload, BlockType, Epoch, EthSpec, ExecPayload, FullPayload,
PublicKeyBytes, SignatureBytes, SignedBeaconBlock, Slot,
};

#[derive(Debug)]
Expand Down Expand Up @@ -295,6 +296,101 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
Ok(())
}

/// Attempt to construct a `BeaconBlock` using a single beacon node.
async fn get_block_from_node<'a, Payload: ExecPayload<E>>(
beacon_node: &'a BeaconNodeHttpClient,
slot: Slot,
randao_reveal: &SignatureBytes,
graffiti: &Option<Graffiti>,
proposer_index: Option<u64>,
) -> Result<BeaconBlock<E, Payload>, BlockError> {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
let block = match Payload::block_type() {
BlockType::Full => {
beacon_node
.get_validator_blocks::<E, Payload>(slot, randao_reveal, graffiti.as_ref())
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};

if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged".to_string(),
));
}

Ok(block)
}

/// Attempt to publish a `SignedBeaconBlock` to a single beacon node.
async fn publish_block_to_node<'a, Payload: ExecPayload<E>>(
beacon_node: &'a BeaconNodeHttpClient,
signed_block: &'a SignedBeaconBlock<E, Payload>,
log: &'a Logger,
) -> Result<(), BlockError> {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);

match Payload::block_type() {
BlockType::Full => beacon_node
.post_beacon_blocks(signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?,
BlockType::Blinded => beacon_node
.post_beacon_blinded_blocks(signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?,
}

info!(
log,
"Successfully published block";
"deposits" => signed_block.message().body().deposits().len(),
"attestations" => signed_block.message().body().attestations().len(),
"graffiti" => signed_block.message().body().graffiti().as_utf8_lossy(),
"slot" => signed_block.slot().as_u64(),
);
Ok(())
}

/// Produce a block at the given slot for validator_pubkey
async fn publish_block<Payload: ExecPayload<E>>(
self,
Expand Down Expand Up @@ -334,62 +430,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
.or_else(|| self.validator_store.graffiti(&validator_pubkey))
.or(self.graffiti);

let randao_reveal_ref = &randao_reveal;
let self_ref = &self;
let proposer_index = self.validator_store.validator_index(&validator_pubkey);
let validator_pubkey_ref = &validator_pubkey;

// A closure to handle beacon block lookups.
let get_block_closure = |beacon_node: BeaconNodeHttpClient| async move {
let get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
let block = match Payload::block_type() {
BlockType::Full => {
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};
drop(get_timer);

if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}

Ok::<_, BlockError>(block)
};

// Request block from first responsive proposer node if specified, if not specified attempt
// from the set of beacon nodes.
Expand All @@ -398,7 +439,13 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
if let Some(proposer_nodes) = &self.proposer_nodes {
if let Ok(block) = proposer_nodes
.first_success(RequireSynced::No, |client| {
get_block_closure(client.clone())
Self::get_block_from_node(
client,
slot,
&randao_reveal,
&graffiti,
proposer_index,
)
})
.await
{
Expand All @@ -408,64 +455,30 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Fallback to beacon nodes
self.beacon_nodes
.first_success(RequireSynced::No, |client| {
get_block_closure(client.clone())
Self::get_block_from_node(
client,
slot,
&randao_reveal,
&graffiti,
proposer_index,
)
})
.await
}
.await?;

let signed_block = self_ref
let signed_block = self
.validator_store
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.sign_block::<Payload>(validator_pubkey, block, current_slot)
.await
.map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?;
let signed_block_ref = &signed_block;

// Closure for handling block publication
let publish_block_closure = |beacon_node: BeaconNodeHttpClient| async move {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);

match Payload::block_type() {
BlockType::Full => beacon_node
.post_beacon_blocks(signed_block_ref)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?,
BlockType::Blinded => beacon_node
.post_beacon_blinded_blocks(signed_block_ref)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?,
}

info!(
log,
"Successfully published block";
"deposits" => signed_block_ref.message().body().deposits().len(),
"attestations" => signed_block_ref.message().body().attestations().len(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block_ref.slot().as_u64(),
);
Ok::<_, BlockError>(())
};

// Publish block with first available publishing node, or fallback to a beacon node.
async {
if let Some(proposer_nodes) = &self.proposer_nodes {
if proposer_nodes
.first_success(RequireSynced::No, |client| {
publish_block_closure(client.clone())
Self::publish_block_to_node(client, &signed_block, log)
})
.await
.is_ok()
Expand All @@ -476,7 +489,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Fallback to beacon nodes
self.beacon_nodes
.first_success(RequireSynced::No, |client| {
publish_block_closure(client.clone())
Self::publish_block_to_node(client, &signed_block, log)
})
.await
}
Expand Down

0 comments on commit 55b3cfb

Please sign in to comment.