-
Notifications
You must be signed in to change notification settings - Fork 105
refactor(validator): refactor gRPC server implementation #1959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| use std::sync::atomic::Ordering; | ||
|
|
||
| use miden_node_proto::generated as grpc; | ||
| use miden_node_utils::ErrorReport; | ||
| use miden_protocol::block::ProposedBlock; | ||
| use miden_protocol::crypto::dsa::ecdsa_k256_keccak::Signature; | ||
| use miden_tx::utils::serde::{Deserializable, Serializable}; | ||
|
|
||
| use crate::block_validation::validate_block; | ||
| use crate::db::{load_chain_tip, upsert_block_header}; | ||
| use crate::server::ValidatorServer; | ||
|
|
||
| #[tonic::async_trait] | ||
| impl grpc::server::validator_api::SignBlock for ValidatorServer { | ||
| type Input = ProposedBlock; | ||
| type Output = Signature; | ||
|
|
||
| fn decode(request: grpc::blockchain::ProposedBlock) -> tonic::Result<Self::Input> { | ||
| ProposedBlock::read_from_bytes(&request.proposed_block).map_err(|err| { | ||
| tonic::Status::invalid_argument( | ||
| err.as_report_context("Failed to deserialize proposed block"), | ||
| ) | ||
| }) | ||
| } | ||
|
|
||
| fn encode(output: Self::Output) -> tonic::Result<grpc::blockchain::BlockSignature> { | ||
| Ok(grpc::blockchain::BlockSignature { signature: output.to_bytes() }) | ||
| } | ||
|
|
||
| async fn handle(&self, proposed_block: Self::Input) -> tonic::Result<Self::Output> { | ||
| // Serialize sign_block requests to prevent race conditions between loading the | ||
| // chain tip and persisting the validated block header. | ||
| let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| { | ||
| tonic::Status::internal(format!("sign_block semaphore closed: {err}")) | ||
| })?; | ||
|
|
||
| // Load the current chain tip from the database. | ||
| let chain_tip = self | ||
| .db | ||
| .query("load_chain_tip", load_chain_tip) | ||
| .await | ||
| .map_err(|err| { | ||
| tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report())) | ||
| })? | ||
| .ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?; | ||
|
|
||
| // Validate the block against the current chain tip. | ||
| let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip) | ||
| .await | ||
| .map_err(|err| { | ||
| tonic::Status::invalid_argument(format!( | ||
| "Failed to validate block: {}", | ||
| err.as_report() | ||
| )) | ||
| })?; | ||
|
|
||
| // Persist the validated block header. | ||
| let new_block_num = header.block_num().as_u32(); | ||
| self.db | ||
| .transact("upsert_block_header", move |conn| upsert_block_header(conn, &header)) | ||
| .await | ||
| .map_err(|err| { | ||
| tonic::Status::internal(format!( | ||
| "Failed to persist block header: {}", | ||
| err.as_report() | ||
| )) | ||
| })?; | ||
|
|
||
| // Update the in-memory counters after successful persistence. | ||
| self.chain_tip.store(new_block_num, Ordering::Relaxed); | ||
| self.signed_blocks_count.fetch_add(1, Ordering::Relaxed); | ||
|
|
||
| Ok(signature) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| use std::sync::atomic::Ordering; | ||
|
|
||
| use miden_node_proto::generated as grpc; | ||
|
|
||
| use crate::server::ValidatorServer; | ||
|
|
||
| #[tonic::async_trait] | ||
| impl grpc::server::validator_api::Status for ValidatorServer { | ||
| type Input = (); | ||
| type Output = (); | ||
|
|
||
| async fn full(&self, _request: ()) -> tonic::Result<grpc::validator::ValidatorStatus> { | ||
| Ok(grpc::validator::ValidatorStatus { | ||
| version: env!("CARGO_PKG_VERSION").to_string(), | ||
| status: "OK".to_string(), | ||
| chain_tip: self.chain_tip.load(Ordering::Relaxed), | ||
| validated_transactions_count: self.validated_transactions_count.load(Ordering::Relaxed), | ||
| signed_blocks_count: self.signed_blocks_count.load(Ordering::Relaxed), | ||
| }) | ||
|
Comment on lines
+13
to
+19
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We just need to be careful when we're adding instrumentation at the top level, that this doesn't override anything important. |
||
| } | ||
|
|
||
| async fn handle(&self, _input: Self::Input) -> tonic::Result<Self::Output> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| fn decode(_request: ()) -> tonic::Result<Self::Input> { | ||
| unimplemented!() | ||
| } | ||
|
|
||
| fn encode(_output: Self::Output) -> tonic::Result<grpc::validator::ValidatorStatus> { | ||
| unimplemented!() | ||
| } | ||
|
Comment on lines
+22
to
+32
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| use std::sync::atomic::Ordering; | ||
|
|
||
| use miden_node_proto::generated as grpc; | ||
| use miden_node_utils::ErrorReport; | ||
| use miden_node_utils::tracing::OpenTelemetrySpanExt; | ||
| use miden_protocol::transaction::{ProvenTransaction, TransactionInputs}; | ||
| use miden_tx::utils::serde::Deserializable; | ||
| use tonic::Status; | ||
|
|
||
| use crate::db::insert_transaction; | ||
| use crate::server::ValidatorServer; | ||
| use crate::tx_validation::validate_transaction; | ||
|
|
||
| #[tonic::async_trait] | ||
| impl grpc::server::validator_api::SubmitProvenTransaction for ValidatorServer { | ||
| type Input = Input; | ||
| type Output = (); | ||
|
|
||
| async fn handle(&self, input: Self::Input) -> tonic::Result<Self::Output> { | ||
| tracing::Span::current().set_attribute("transaction.id", input.tx.id()); | ||
|
|
||
| // Validate the transaction. | ||
| let tx_info = validate_transaction(input.tx, input.inputs).await.map_err(|err| { | ||
| Status::invalid_argument(err.as_report_context("Invalid transaction")) | ||
| })?; | ||
|
|
||
| // Store the validated transaction. | ||
| let count = self | ||
| .db | ||
| .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info)) | ||
| .await | ||
| .map_err(|err| { | ||
| Status::internal(err.as_report_context("Failed to insert transaction")) | ||
| })?; | ||
|
|
||
| self.validated_transactions_count.fetch_add(count as u64, Ordering::Relaxed); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn decode(request: grpc::transaction::ProvenTransaction) -> tonic::Result<Self::Input> { | ||
| let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| { | ||
| Status::invalid_argument(err.as_report_context("Invalid proven transaction")) | ||
| })?; | ||
| let inputs = request | ||
| .transaction_inputs | ||
| .ok_or(Status::invalid_argument("Missing transaction inputs"))?; | ||
| let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| { | ||
| Status::invalid_argument(err.as_report_context("Invalid transaction inputs")) | ||
| })?; | ||
|
|
||
| Ok(Self::Input { tx, inputs }) | ||
| } | ||
|
|
||
| fn encode(output: Self::Output) -> tonic::Result<()> { | ||
| Ok(output) | ||
| } | ||
| } | ||
|
|
||
| pub struct Input { | ||
| tx: ProvenTransaction, | ||
| inputs: TransactionInputs, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect all of our
encodecalls are infallible.. Should we change the API to replaceResult<T>withT? Maybe in the future we may need fallible encoding? Can't imagine why though..