Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
19bfe73
Add `random_eth_address_seed`
emlautarom1 Mar 4, 2026
0c3c579
Make `Record::new` accept argument as ref
emlautarom1 Mar 4, 2026
76df81a
Prefer to use `&[]` as argument
emlautarom1 Mar 4, 2026
d4a1177
Prefer `impl AsRef<str>` when possible
emlautarom1 Mar 4, 2026
6dc0d60
Prefer `&mut` over ownership
emlautarom1 Mar 4, 2026
52bcd30
Expose function
emlautarom1 Mar 4, 2026
129859a
Use inner data directly
emlautarom1 Mar 4, 2026
a28dffc
Initial `test_cluster`
emlautarom1 Mar 4, 2026
0f556cc
Add missing ref
emlautarom1 Mar 4, 2026
04ec923
Apply clippy suggestions
emlautarom1 Mar 4, 2026
0cfe82d
Remove +1
emlautarom1 Mar 4, 2026
42a6ba8
Make `now` be local without nanos
emlautarom1 Mar 4, 2026
25bb2a7
Add `fetch_definition`
emlautarom1 Mar 4, 2026
c33ef86
Update lockfile
emlautarom1 Mar 4, 2026
09a4b97
Fix formatting
emlautarom1 Mar 4, 2026
4e441af
Fix clippy suggestions
emlautarom1 Mar 4, 2026
0b26324
Remove import
emlautarom1 Mar 4, 2026
8aa7a25
Ignore clippy warning
emlautarom1 Mar 4, 2026
bb230ab
Add `create_validator_keys_dir`
emlautarom1 Mar 5, 2026
41fde25
Make signature more generic
emlautarom1 Mar 5, 2026
0213e47
Merge branch 'main' into emlautarom1/cluster-helpers
emlautarom1 Mar 5, 2026
9bbe2eb
Apply timeout to the entire operation
emlautarom1 Mar 5, 2026
cc9d2ca
Add more context to the error
emlautarom1 Mar 5, 2026
070451c
Truncate to u8
emlautarom1 Mar 5, 2026
3a83e36
Update function name and docs
emlautarom1 Mar 5, 2026
30c49f4
Revert operation order change
emlautarom1 Mar 10, 2026
0bad326
feat(p2p): add upgrade to quic (#259)
varex83 Mar 9, 2026
e7c654b
feat(p2p): add force direct connections (#260)
varex83 Mar 9, 2026
0ada038
fix: update quinn-proto package (#275)
iamquang95 Mar 10, 2026
7594a3c
Merge remote-tracking branch 'origin/main' into emlautarom1/cluster-h…
emlautarom1 Mar 10, 2026
27d5241
Fix clippy warnings
emlautarom1 Mar 10, 2026
ca177d9
Improve random management
emlautarom1 Mar 11, 2026
86cd6c4
Use explicit reqwest features
emlautarom1 Mar 11, 2026
3055450
Fix typo
emlautarom1 Mar 12, 2026
9d174eb
Use `helpers::to_0x_hex`
emlautarom1 Mar 12, 2026
75637ba
Use `Vec::with_capacity`
emlautarom1 Mar 12, 2026
e8e11f0
Use `test cluster` as name
emlautarom1 Mar 12, 2026
971afcd
Remove qualified use
emlautarom1 Mar 12, 2026
d38831a
Fix clippy lints
emlautarom1 Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
547 changes: 281 additions & 266 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/cli/src/commands/create_enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn run(args: CreateEnrArgs) -> Result<()> {

let key = k1::new_saved_priv_key(&args.data_dir)?;

let record = Record::new(key, Vec::new())?;
let record = Record::new(&key, Vec::new())?;
let key_path = k1::key_path(&args.data_dir);

let mut writer = io::stdout();
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/commands/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn run(args: EnrArgs) -> Result<()> {
}
};

let record = Record::new(key.clone(), vec![])?;
let record = Record::new(&key, vec![])?;

writeln!(writer, "{}", record)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/commands/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ pub(crate) async fn publish_result_to_obol_api(
private_key_file: impl AsRef<Path>,
) -> CliResult<()> {
let private_key = load_or_generate_key(private_key_file.as_ref()).await?;
let enr = Record::new(private_key.clone(), vec![])?;
let enr = Record::new(&private_key, vec![])?;
let sign_data_bytes = serde_json::to_vec(&data)?;
let hash = hash_ssz(&sign_data_bytes)?;
let sig = sign(&private_key, &hash)?;
Expand Down
5 changes: 4 additions & 1 deletion crates/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pluto-eth2util.workspace = true
pluto-eth1wrap.workspace = true
pluto-k1util.workspace = true
k256.workspace = true
tokio.workspace = true
reqwest = { workspace = true, features = ["json"] }

[build-dependencies]
prost-build.workspace = true
Expand All @@ -34,7 +36,8 @@ prost-build.workspace = true
test-case.workspace = true
pluto-testutil.workspace = true
rand.workspace = true
tokio.workspace = true
tempfile.workspace = true
wiremock.workspace = true

[lints]
workspace = true
27 changes: 17 additions & 10 deletions crates/cluster/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
ssz_hasher::Hasher,
version::{CURRENT_VERSION, DKG_ALGO, versions::*},
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Timelike, Utc};
use libp2p::PeerId;
use pluto_eth1wrap::{EthClient, EthClientError};
use pluto_eth2util::enr::{Record, RecordError};
Expand Down Expand Up @@ -394,7 +394,12 @@ impl Definition {
uuid: uuid.to_string(),
name,
version: CURRENT_VERSION.to_string(),
timestamp: Utc::now().to_string(),
// TODO: This is very error prone and should be replaced with a controlled timestamp in
// UTC.
timestamp: chrono::Local::now()
Comment thread
emlautarom1 marked this conversation as resolved.
.with_nanosecond(0)
.expect("nanoseconds = 0")
.to_rfc3339(),
Comment on lines +399 to +402
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required to match the Go implementation: requires TZ and explicitly set nanoseconds to 0, otherwise the string gets too long and SSZ serialization fails (exceeds 32 byte limit)

num_validators,
threshold,
dkg_algorithm: DKG_ALGO.to_string(),
Expand Down Expand Up @@ -441,7 +446,9 @@ impl Definition {
return Err(InvalidGasLimitError::GasLimitNotSet.into());
}

def.set_definition_hashes()
def.set_definition_hashes()?;

Ok(def)
}

/// Returns the timestamp of the definition.
Expand Down Expand Up @@ -479,7 +486,7 @@ impl Definition {
// there are no EIP712 signatures before v1.3.0. For definition versions
// earlier than v1.3.0, error if either config signature or enr signature for
// any operator is present.
if !Self::support_eip712_sigs(self.version.as_str()) {
if !Self::support_eip712_sigs(&self.version) {
return if Self::eip712_sigs_present(&self.operators) {
Err(DefinitionError::OlderVersionSignaturesNotSupported)
} else {
Expand Down Expand Up @@ -664,18 +671,18 @@ impl Definition {
}

/// Sets the definition hashes.
pub fn set_definition_hashes(mut self) -> Result<Self, DefinitionError> {
pub fn set_definition_hashes(&mut self) -> Result<(), DefinitionError> {
let config_hash =
hash_definition(&self, true).map_err(|e| DefinitionError::SSZError(Box::new(e)))?;
hash_definition(self, true).map_err(|e| DefinitionError::SSZError(Box::new(e)))?;

self.config_hash = config_hash.to_vec();

let definition_hash =
hash_definition(&self, false).map_err(|e| DefinitionError::SSZError(Box::new(e)))?;
hash_definition(self, false).map_err(|e| DefinitionError::SSZError(Box::new(e)))?;

self.definition_hash = definition_hash.to_vec();

Ok(self)
Ok(())
}

/// `verify_hashes` returns an error if hashes populated from json object
Expand Down Expand Up @@ -707,8 +714,8 @@ impl Definition {
/// Returns true if the provided definition version supports EIP712
/// signatures. Note that Definition versions prior to v1.3.0 don't
/// support EIP712 signatures.
pub(crate) fn support_eip712_sigs(version: &str) -> bool {
!matches!(version, V1_0 | V1_1 | V1_2)
pub fn support_eip712_sigs(version: impl AsRef<str>) -> bool {
!matches!(version.as_ref(), V1_0 | V1_1 | V1_2)
}

fn eip712_sigs_present(operators: &[Operator]) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion crates/cluster/src/eip712sigs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub(crate) fn digest_eip712(
Ok(digest)
}

fn sign_eip712(
/// Returns the EIP712 signature for the primary type.
pub(crate) fn sign_eip712(
secret_key: &SecretKey,
typ: &EIP712Type,
definition: &Definition,
Expand Down
204 changes: 199 additions & 5 deletions crates/cluster/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use chrono::{DateTime, Utc};
use pluto_crypto::tbls::Tbls;
use pluto_eth2util::helpers::{checksum_address, public_key_to_address};
use pluto_k1util::K1UtilError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
use std::borrow::Cow;
use std::{borrow::Cow, path::PathBuf};

use crate::{definition::ADDRESS_LEN, ssz::SSZError, ssz_hasher::HashWalker};

type VerifySigResult<T> = std::result::Result<T, VerifySigError>;
use crate::{
definition::{self, ADDRESS_LEN, Definition},
eip712sigs, operator,
ssz::SSZError,
ssz_hasher::HashWalker,
};

/// Error type returned by `verify_sig`.
#[derive(Debug, thiserror::Error)]
Expand All @@ -22,13 +26,65 @@ pub enum VerifySigError {
}

/// Returns true if the signature matches the digest and expected address.
pub fn verify_sig(expected_addr: &str, digest: &[u8], sig: &[u8]) -> VerifySigResult<bool> {
pub fn verify_sig(
expected_addr: &str,
digest: &[u8],
sig: &[u8],
) -> std::result::Result<bool, VerifySigError> {
let expected_addr = checksum_address(expected_addr)?;
let recovered = pluto_k1util::recover(digest, sig)?;
let actual_addr = public_key_to_address(&recovered);
Ok(expected_addr == actual_addr)
}

/// Error type returned by `fetch_definition`.
#[derive(Debug, thiserror::Error)]
pub enum FetchError {
Comment thread
emlautarom1 marked this conversation as resolved.
/// Timeout while fetching the definition.
#[error("timeout {0}")]
Timeout(#[from] tokio::time::error::Elapsed),

/// HTTP error while fetching the definition.
#[error("HTTP error {0}")]
Http(#[from] reqwest::Error),
Comment thread
emlautarom1 marked this conversation as resolved.
}

/// Fetch cluster definition file from a remote URI.
pub async fn fetch_definition(
url: impl reqwest::IntoUrl,
) -> std::result::Result<Definition, FetchError> {
let definition = tokio::time::timeout(std::time::Duration::from_secs(10), async {
let response = reqwest::get(url).await?.error_for_status()?;
response.json::<Definition>().await
})
.await??;

Ok(definition)
}

/// Creates a new directory for validator keys.
/// If the directory "validator_keys" exists, it checks if the directory is
/// empty.
pub async fn create_validator_keys_dir(parent_dir: &std::path::Path) -> std::io::Result<PathBuf> {
let vk_dir = parent_dir.join("validator_keys");

if let Err(e) = tokio::fs::create_dir(&vk_dir).await {
if e.kind() != std::io::ErrorKind::AlreadyExists {
return Err(e);
}

let mut entries = tokio::fs::read_dir(&vk_dir).await?;
if entries.next_entry().await?.is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"validator_keys directory exists and is not empty",
));
}
}

Ok(vk_dir)
}

/// EthHex represents byte slices that are json formatted as 0x prefixed hex.
/// Can be used both as a standalone type and with serde_as.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -239,8 +295,69 @@ pub fn to_0x_hex(bytes: &[u8]) -> String {
format!("0x{}", hex::encode(bytes))
}

/// Signs the creator's config hash.
pub fn sign_creator(
secret: &k256::SecretKey,
definition: &mut definition::Definition,
) -> Result<(), eip712sigs::EIP712Error> {
let config_signature = eip712sigs::sign_eip712(
secret,
&eip712sigs::eip712_creator_config_hash(),
definition,
&operator::Operator::default(),
)?;

definition.creator.config_signature = config_signature;

Ok(())
}

/// Signs the operator's config hash and enr.
pub fn sign_operator(
secret: &k256::SecretKey,
definition: &definition::Definition,
operator: &mut operator::Operator,
) -> Result<(), crate::eip712sigs::EIP712Error> {
let config_signature = crate::eip712sigs::sign_eip712(
secret,
&crate::eip712sigs::get_operator_eip712_type(&definition.version),
definition,
operator,
)?;

let enr_signature = crate::eip712sigs::sign_eip712(
secret,
&crate::eip712sigs::eip712_enr(),
definition,
operator,
)?;

operator.config_signature = config_signature;
operator.enr_signature = enr_signature;

Ok(())
}

/// Returns a BLS aggregate signature of the message signed by all the shares.
pub fn agg_sign(
secrets: &[Vec<pluto_crypto::types::PrivateKey>],
message: &[u8],
) -> Result<pluto_crypto::types::Signature, pluto_crypto::types::Error> {
let blst = pluto_crypto::blst_impl::BlstImpl;

let sigs = secrets
.iter()
.flat_map(|shares| shares.iter())
.map(|share| blst.sign(share, message))
.collect::<Result<Vec<_>, _>>()?;

blst.aggregate(&sigs)
}

#[cfg(test)]
mod tests {
use crate::test_cluster;

use super::*;
use serde_with::serde_as;

Expand Down Expand Up @@ -330,4 +447,81 @@ mod tests {
assert!(json.contains("\"0x010203\""));
assert!(json.contains("[4,5,6]"));
}

#[tokio::test]
async fn fetch_definition_valid() {
let (lock, ..) = test_cluster::new_for_test(1, 2, 3, 0);
let expected_definition = lock.definition.clone();

let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/validDef"))
.respond_with(wiremock::ResponseTemplate::new(200).set_body_json(lock.definition))
.mount(&server)
.await;

let actual_definition = super::fetch_definition(format!("{}/validDef", &server.uri()))
.await
.unwrap();

assert_eq!(actual_definition, expected_definition);
}

#[tokio::test]
async fn fetch_definition_invalid() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/invalidDef"))
.respond_with(
wiremock::ResponseTemplate::new(200).set_body_raw("r#{}#", "application/json"),
)
.mount(&server)
.await;

let response = super::fetch_definition(format!("{}/invalidDef", &server.uri())).await;

assert!(matches!(response, Err(super::FetchError::Http(e)) if e.is_decode()));
}

#[tokio::test]
async fn fetch_definition_non_200() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/non_ok"))
.respond_with(wiremock::ResponseTemplate::new(500))
.mount(&server)
.await;

let response = super::fetch_definition(format!("{}/non_ok", &server.uri())).await;

assert!(matches!(response, Err(super::FetchError::Http(e)) if e.is_status()));
}

#[tokio::test]
async fn create_validator_keys_dir() {
let tmp = tempfile::tempdir().unwrap();
let parent_dir = tmp.path();

// First attempt must succeed.
let dir = super::create_validator_keys_dir(parent_dir).await.unwrap();
assert!(dir.starts_with(parent_dir));
assert!(dir.ends_with("validator_keys"));

// Second attempt shall succeed as long as the dir is empty.
let dir2 = super::create_validator_keys_dir(parent_dir).await.unwrap();
assert_eq!(dir, dir2);

// Create a file in the directory to make it non-empty.
tokio::fs::write(dir.join("file"), b"data").await.unwrap();
let err = super::create_validator_keys_dir(parent_dir)
.await
.unwrap_err();
assert!(matches!(err, e if e.kind() == std::io::ErrorKind::AlreadyExists));

// Parent directory does not exist
let err = super::create_validator_keys_dir(&parent_dir.join("nonexistent"))
.await
.unwrap_err();
assert!(matches!(err, e if e.kind() == std::io::ErrorKind::NotFound));
}
}
1 change: 1 addition & 0 deletions crates/cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod ssz;
/// Cluster SSZ hashing management and coordination.
pub mod ssz_hasher;
/// Cluster test cluster management and coordination.
#[cfg(test)]
pub mod test_cluster;
/// Cluster version management and coordination.
pub mod version;
4 changes: 2 additions & 2 deletions crates/cluster/src/manifest/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ mod tests {

for i in 0..operator_amt {
let k1_key = generate_insecure_k1_key(i);
let enr = Record::new(k1_key.clone(), vec![]).unwrap();
let enr = Record::new(&k1_key, vec![]).unwrap();

operators.push(Operator {
address: format!("0x{:040x}", i),
Expand Down Expand Up @@ -457,7 +457,7 @@ mod tests {
let k1_key0 = generate_insecure_k1_key(1);
let k1_key_unknown = generate_insecure_k1_key(200);

let enr0 = Record::new(k1_key0, vec![]).unwrap();
let enr0 = Record::new(&k1_key0, vec![]).unwrap();

let cluster = Cluster {
operators: vec![Operator {
Expand Down
Loading
Loading