Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions chain/src/actors/syncer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use super::{
ingress::{Mailbox, Message},
Config,
};
use crate::actors::syncer::{
handler,
key::{self, MultiIndex, Value},
use crate::{
actors::syncer::{
handler,
key::{self, MultiIndex, Value},
},
Indexer,
};
use alto_client::Client;
use alto_types::{Block, Finalization, Finalized, Notarized};
use bytes::Bytes;
use commonware_cryptography::{bls12381, ed25519::PublicKey, sha256::Digest};
Expand Down Expand Up @@ -44,7 +46,7 @@ use std::{
use tracing::{debug, info, warn};

/// Application actor.
pub struct Actor<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> {
pub struct Actor<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>, I: Indexer> {
context: R,
public_key: PublicKey,
public: bls12381::PublicKey,
Expand All @@ -53,7 +55,7 @@ pub struct Actor<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<
mailbox_size: usize,
backfill_quota: Quota,
activity_timeout: u64,
client: Option<Client>,
indexer: Option<I>,

// Blocks verified stored by view<>digest
verified: Archive<TwoCap, Digest, B, R>,
Expand All @@ -76,9 +78,9 @@ pub struct Actor<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<
contiguous_height: Gauge,
}

impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B, R> {
impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>, I: Indexer> Actor<B, R, I> {
/// Create a new application actor.
pub async fn init(context: R, config: Config) -> (Self, Mailbox) {
pub async fn init(context: R, config: Config<I>) -> (Self, Mailbox) {
// Initialize verified blocks
let verified_journal = Journal::init(
context.with_label("verified_journal"),
Expand Down Expand Up @@ -197,13 +199,6 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,

// Initialize mailbox
let (sender, mailbox) = mpsc::channel(config.mailbox_size);

// Initialize client
let mut client = None;
if let Some(indexer) = config.indexer {
client = Some(Client::new(&indexer, config.identity.into()));
info!(indexer, "initialized indexer client");
}
(
Self {
context,
Expand All @@ -214,7 +209,7 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,
mailbox_size: config.mailbox_size,
backfill_quota: config.backfill_quota,
activity_timeout: config.activity_timeout,
client,
indexer: config.indexer,

verified: verified_archive,
notarized: notarized_archive,
Expand Down Expand Up @@ -484,15 +479,16 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,
}
Message::Notarized { proof, seed } => {
// Upload seed to indexer (if available)
if let Some(client) = self.client.as_ref() {
if let Some(indexer) = self.indexer.as_ref() {
self.context.with_label("indexer").spawn({
let client = client.clone();
let indexer = indexer.clone();
let view = proof.view;
move |_| async move {
let seed = seed.serialize().into();
let result = client.seed_upload(seed).await;
let result = indexer.seed_upload(seed).await;
if let Err(e) = result {
warn!(?e, "failed to upload seed");
return;
}
debug!(view, "seed uploaded to indexer");
}
Expand Down Expand Up @@ -526,15 +522,16 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,
debug!(view, height, "notarized block stored");

// Upload to indexer (if available)
if let Some(client) = self.client.as_ref() {
if let Some(indexer) = self.indexer.as_ref() {
self.context.with_label("indexer").spawn({
let client = client.clone();
let indexer = indexer.clone();
move |_| async move {
let result = client
let result = indexer
.notarization_upload(notarization)
.await;
if let Err(e) = result {
warn!(?e, "failed to upload notarization");
return;
}
debug!(view, "notarization uploaded to indexer");
}
Expand All @@ -553,15 +550,16 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,
}
Message::Finalized { proof, seed } => {
// Upload seed to indexer (if available)
if let Some(client) = self.client.as_ref() {
if let Some(indexer) = self.indexer.as_ref() {
self.context.with_label("indexer").spawn({
let client = client.clone();
let indexer = indexer.clone();
let view = proof.view;
move |_| async move {
let seed = seed.serialize().into();
let result = client.seed_upload(seed).await;
let result = indexer.seed_upload(seed).await;
if let Err(e) = result {
warn!(?e, "failed to upload seed");
return;
}
debug!(view, "seed uploaded to indexer");
}
Expand Down Expand Up @@ -625,16 +623,17 @@ impl<B: Blob, R: Rng + Spawner + Metrics + Clock + GClock + Storage<B>> Actor<B,
self.finalized_height.set(height as i64);

// Upload to indexer (if available)
if let Some(client) = self.client.as_ref() {
if let Some(indexer) = self.indexer.as_ref() {
self.context.with_label("indexer").spawn({
let client = client.clone();
let indexer = indexer.clone();
let finalization = Finalized::new(proof, block).serialize().into();
move |_| async move {
let result = client
let result = indexer
.finalization_upload(finalization)
.await;
if let Err(e) = result {
warn!(?e, "failed to upload finalization");
return;
}
debug!(height, "finalization uploaded to indexer");
}
Expand Down
6 changes: 4 additions & 2 deletions chain/src/actors/syncer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ pub use actor::Actor;
mod ingress;
pub use ingress::Mailbox;

use crate::Indexer;

/// Configuration for the syncer.
pub struct Config {
pub struct Config<I: Indexer> {
pub partition_prefix: String,

pub public_key: PublicKey,
Expand All @@ -30,5 +32,5 @@ pub struct Config {

pub activity_timeout: u64,

pub indexer: Option<String>,
pub indexer: Option<I>,
}
12 changes: 10 additions & 2 deletions chain/src/bin/validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alto_chain::{engine, Config};
use alto_client::Client;
use alto_types::P2P_NAMESPACE;
use axum::{routing::get, serve, Extension, Router};
use clap::{Arg, Command};
Expand Down Expand Up @@ -91,11 +92,12 @@ fn main() {
let threshold = quorum(peers_u32).expect("unable to derive quorum");
let identity = from_hex_formatted(&config.identity).expect("Could not parse identity");
let identity = poly::Public::deserialize(&identity, threshold).expect("Identity is invalid");
let identity_public = poly::public(&identity);
let public_key = signer.public_key();
let ip = peers.get(&public_key).expect("Could not find self in IPs");
info!(
?public_key,
identity = hex(&poly::public(&identity).serialize()),
identity = hex(&identity_public.serialize()),
?ip,
port = config.port,
"loaded config"
Expand Down Expand Up @@ -177,6 +179,12 @@ fn main() {
// Create network
let p2p = network.start();

// Create indexer
let mut indexer = None;
if let Some(uri) = config.indexer {
indexer = Some(Client::new(&uri, identity_public.into()));
}

// Create engine
let config = engine::Config {
partition_prefix: "engine".to_string(),
Expand All @@ -195,7 +203,7 @@ fn main() {
max_fetch_size: MAX_FETCH_SIZE,
fetch_concurrent: FETCH_CONCURRENT,
fetch_rate_per_peer: resolver_limit,
indexer: config.indexer,
indexer,
};
let engine = engine::Engine::new(context.with_label("engine"), config).await;

Expand Down
23 changes: 16 additions & 7 deletions chain/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::actors::{application, syncer};
use crate::{
actors::{application, syncer},
Indexer,
};
use alto_types::NAMESPACE;
use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover};
use commonware_cryptography::{
Expand All @@ -17,7 +20,7 @@ use rand::{CryptoRng, Rng};
use std::time::Duration;
use tracing::{error, warn};

pub struct Config {
pub struct Config<I: Indexer> {
pub partition_prefix: String,
pub signer: Ed25519,
pub identity: Poly<group::Public>,
Expand All @@ -36,14 +39,18 @@ pub struct Config {
pub fetch_concurrent: usize,
pub fetch_rate_per_peer: Quota,

pub indexer: Option<String>,
pub indexer: Option<I>,
}

pub struct Engine<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> {
pub struct Engine<
B: Blob,
E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
I: Indexer,
> {
context: E,

application: application::Actor<E>,
syncer: syncer::Actor<B, E>,
syncer: syncer::Actor<B, E, I>,
syncer_mailbox: syncer::Mailbox,
consensus: Consensus<
B,
Expand All @@ -57,8 +64,10 @@ pub struct Engine<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Stora
>,
}

impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> Engine<B, E> {
pub async fn new(context: E, cfg: Config) -> Self {
impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics, I: Indexer>
Engine<B, E, I>
{
pub async fn new(context: E, cfg: Config<I>) -> Self {
// Create the application
let public = public(&cfg.identity);
let (application, supervisor, application_mailbox) = application::Actor::new(
Expand Down
Loading