Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not attempt to treat miner as a coordinator when updating DKG #4591

Merged
merged 9 commits into from Apr 3, 2024
2 changes: 1 addition & 1 deletion stacks-signer/src/cli.rs
Expand Up @@ -264,7 +264,7 @@ fn parse_contract(contract: &str) -> Result<QualifiedContractIdentifier, String>
pub fn parse_pox_addr(pox_address_literal: &str) -> Result<PoxAddress, String> {
PoxAddress::from_b58(pox_address_literal).map_or_else(
|| Err(format!("Invalid pox address: {pox_address_literal}")),
|pox_address| Ok(pox_address),
Ok,
)
}

Expand Down
18 changes: 8 additions & 10 deletions stacks-signer/src/coordinator.rs
Expand Up @@ -91,17 +91,15 @@ impl CoordinatorSelector {
}
}
new_index
} else {
if ROTATE_COORDINATORS {
let mut new_index = self.coordinator_index.saturating_add(1);
if new_index == self.coordinator_ids.len() {
// We have exhausted all potential coordinators. Go back to the start
new_index = 0;
}
new_index
} else {
self.coordinator_index
} else if ROTATE_COORDINATORS {
let mut new_index = self.coordinator_index.saturating_add(1);
jferrant marked this conversation as resolved.
Show resolved Hide resolved
if new_index == self.coordinator_ids.len() {
// We have exhausted all potential coordinators. Go back to the start
new_index = 0;
}
new_index
} else {
self.coordinator_index
};
self.coordinator_id = *self
.coordinator_ids
Expand Down
1 change: 1 addition & 0 deletions stacks-signer/src/main.rs
Expand Up @@ -398,6 +398,7 @@ pub mod tests {
use super::{handle_generate_stacking_signature, *};
use crate::{GenerateStackingSignatureArgs, GlobalConfig};

#[allow(clippy::too_many_arguments)]
fn call_verify_signer_sig(
pox_addr: &PoxAddress,
reward_cycle: u128,
Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/src/runloop.rs
Expand Up @@ -392,7 +392,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
if signer.approved_aggregate_public_key.is_none() {
if let Err(e) = retry_with_exponential_backoff(|| {
signer
.update_dkg(&self.stacks_client, current_reward_cycle)
.update_dkg(&self.stacks_client)
.map_err(backoff::Error::transient)
}) {
error!("{signer}: failed to update DKG: {e}");
Expand Down
117 changes: 63 additions & 54 deletions stacks-signer/src/signer.rs
Expand Up @@ -185,9 +185,11 @@ impl std::fmt::Display for Signer {
}

impl Signer {
/// Return the current coordinator. If in the active reward cycle, this is the miner,
/// so the first element of the tuple will be None (because the miner does not have a signer index).
fn get_coordinator(&self, current_reward_cycle: u64) -> (Option<u32>, PublicKey) {
/// Return the current coordinator.
/// If the current reward cycle is the active reward cycle, this is the miner,
/// so the first element of the tuple will be None (because the miner does not have a signer index).
/// Otherwise, the coordinator is the signer with the index returned by the coordinator selector.
fn get_coordinator_sign(&self, current_reward_cycle: u64) -> (Option<u32>, PublicKey) {
if self.reward_cycle == current_reward_cycle {
let Some(ref cur_miner) = self.miner_key else {
error!(
Expand All @@ -198,12 +200,18 @@ impl Signer {
return (Some(selected.0), selected.1);
};
// coordinator is the current miner.
(None, cur_miner.clone())
(None, *cur_miner)
} else {
let selected = self.coordinator_selector.get_coordinator();
return (Some(selected.0), selected.1);
(Some(selected.0), selected.1)
}
}

/// Get the current coordinator for executing DKG
/// This will always use the coordinator selector to determine the coordinator
fn get_coordinator_dkg(&self) -> (u32, PublicKey) {
self.coordinator_selector.get_coordinator()
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -415,24 +423,34 @@ impl Signer {
stacks_client: &StacksClient,
current_reward_cycle: u64,
) {
let coordinator_id = self.get_coordinator(current_reward_cycle).0;
match &self.state {
State::Idle => {
let Some(command) = self.commands.pop_front() else {
jferrant marked this conversation as resolved.
Show resolved Hide resolved
debug!("{self}: Nothing to process. Waiting for command...");
return;
};
let coordinator_id = if matches!(command, Command::Dkg) {
// We cannot execute a DKG command if we are not the coordinator
Some(self.get_coordinator_dkg().0)
} else {
self.get_coordinator_sign(current_reward_cycle).0
};
if coordinator_id != Some(self.signer_id) {
debug!(
"{self}: Coordinator is {coordinator_id:?}. Will not process any commands...",
);
// Put the command back in the queue for later processing.
self.commands.push_front(command);
return;
}
if let Some(command) = self.commands.pop_front() {
self.execute_command(stacks_client, &command);
} else {
debug!("{self}: Nothing to process. Waiting for command...",);
}
self.execute_command(stacks_client, &command);
}
State::OperationInProgress => {
// We cannot execute the next command until the current one is finished...
debug!("{self}: Waiting for coordinator {coordinator_id:?} operation to finish. Coordinator state = {:?}", self.coordinator.state);
debug!(
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
self.coordinator.state
);
}
}
}
Expand All @@ -445,7 +463,6 @@ impl Signer {
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) {
let coordinator_id = self.get_coordinator(current_reward_cycle).0;
let mut block_info = match block_validate_response {
BlockValidateResponse::Ok(block_validate_ok) => {
let signer_signature_hash = block_validate_ok.signer_signature_hash;
Expand Down Expand Up @@ -517,32 +534,13 @@ impl Signer {
sig: vec![],
};
self.handle_packets(stacks_client, res, &[packet], current_reward_cycle);
} else {
if block_info.valid.unwrap_or(false)
&& !block_info.signed_over
&& coordinator_id == Some(self.signer_id)
{
// We are the coordinator. Trigger a signing round for this block
debug!(
"{self}: attempt to trigger a signing round for block";
"signer_sighash" => %block_info.block.header.signer_signature_hash(),
"block_hash" => %block_info.block.header.block_hash(),
);
self.commands.push_back(Command::Sign {
block: block_info.block.clone(),
is_taproot: false,
merkle_root: None,
});
} else {
debug!(
"{self}: ignoring block.";
"block_hash" => block_info.block.header.block_hash(),
"valid" => block_info.valid,
"signed_over" => block_info.signed_over,
"coordinator_id" => coordinator_id,
);
}
}
debug!(
"{self}: Received a block validate response";
"block_hash" => block_info.block.header.block_hash(),
"valid" => block_info.valid,
"signed_over" => block_info.signed_over,
);
self.signer_db
.insert_block(self.reward_cycle, &block_info)
.unwrap_or_else(|_| panic!("{self}: Failed to insert block in DB"));
Expand All @@ -556,7 +554,6 @@ impl Signer {
messages: &[SignerMessage],
current_reward_cycle: u64,
) {
let coordinator_pubkey = self.get_coordinator(current_reward_cycle).1;
let packets: Vec<Packet> = messages
.iter()
.filter_map(|msg| match msg {
Expand All @@ -565,6 +562,11 @@ impl Signer {
| SignerMessage::Transactions(_) => None,
// TODO: if a signer tries to trigger DKG and we already have one set in the contract, ignore the request.
SignerMessage::Packet(packet) => {
let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) {
self.get_coordinator_dkg().1
} else {
self.get_coordinator_sign(current_reward_cycle).1
};
self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey)
}
})
Expand Down Expand Up @@ -629,6 +631,19 @@ impl Signer {
}
}

/// Helper function for determining if the provided message is a DKG specific message
fn is_dkg_message(msg: &Message) -> bool {
matches!(
msg,
Message::DkgBegin(_)
| Message::DkgEnd(_)
| Message::DkgEndBegin(_)
| Message::DkgPrivateBegin(_)
| Message::DkgPrivateShares(_)
| Message::DkgPublicShares(_)
)
}

/// Process inbound packets as both a signer and a coordinator
/// Will send outbound packets and operation results as appropriate
fn handle_packets(
Expand Down Expand Up @@ -923,7 +938,7 @@ impl Signer {
};
self.signer_db
.insert_block(self.reward_cycle, &updated_block_info)
.expect(&format!("{self}: Failed to insert block in DB"));
.unwrap_or_else(|_| panic!("{self}: Failed to insert block in DB"));
let process_request = updated_block_info.vote.is_some();
if !process_request {
debug!("Failed to validate nonce request");
Expand Down Expand Up @@ -986,14 +1001,12 @@ impl Signer {
) {
error!("{}: Failed to serialize DKGResults message for StackerDB, will continue operating.", self.signer_id;
"error" => %e);
} else {
if let Err(e) = self
.stackerdb
.send_message_bytes_with_retry(&MessageSlotID::DkgResults, dkg_results_bytes)
{
error!("{}: Failed to send DKGResults message to StackerDB, will continue operating.", self.signer_id;
} else if let Err(e) = self
.stackerdb
.send_message_bytes_with_retry(&MessageSlotID::DkgResults, dkg_results_bytes)
{
error!("{}: Failed to send DKGResults message to StackerDB, will continue operating.", self.signer_id;
"error" => %e);
}
}

let epoch = retry_with_exponential_backoff(|| {
Expand Down Expand Up @@ -1207,11 +1220,7 @@ impl Signer {
}

/// Update the DKG for the provided signer info, triggering it if required
pub fn update_dkg(
&mut self,
stacks_client: &StacksClient,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
let reward_cycle = self.reward_cycle;
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
Expand All @@ -1230,8 +1239,8 @@ impl Signer {
}
return Ok(());
};
let coordinator_id = self.get_coordinator(current_reward_cycle).0;
if Some(self.signer_id) == coordinator_id && self.state == State::Idle {
let coordinator_id = self.get_coordinator_dkg().0;
if self.signer_id == coordinator_id && self.state == State::Idle {
debug!("{self}: Checking if old vote transaction exists in StackerDB...");
// Have I already voted and have a pending transaction? Check stackerdb for the same round number and reward cycle vote transaction
// Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes
Expand Down
18 changes: 9 additions & 9 deletions stacks-signer/src/signerdb.rs
Expand Up @@ -32,7 +32,7 @@ pub struct SignerDb {
db: Connection,
}

const CREATE_BLOCKS_TABLE: &'static str = "
const CREATE_BLOCKS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS blocks (
reward_cycle INTEGER NOT NULL,
signer_signature_hash TEXT NOT NULL,
Expand Down Expand Up @@ -80,11 +80,11 @@ impl SignerDb {
let result: Option<String> = query_row(
&self.db,
"SELECT block_info FROM blocks WHERE reward_cycle = ? AND signer_signature_hash = ?",
&[&reward_cycle.to_string(), &format!("{}", hash)],
[&reward_cycle.to_string(), &format!("{}", hash)],
)?;
if let Some(block_info) = result {
let block_info: BlockInfo =
serde_json::from_str(&block_info).map_err(|e| DBError::SerializationError(e))?;
serde_json::from_str(&block_info).map_err(DBError::SerializationError)?;
Ok(Some(block_info))
} else {
Ok(None)
Expand Down Expand Up @@ -116,13 +116,13 @@ impl SignerDb {
self.db
.execute(
"INSERT OR REPLACE INTO blocks (reward_cycle, signer_signature_hash, block_info) VALUES (?1, ?2, ?3)",
&[reward_cycle.to_string(), format!("{}", hash), block_json],
[reward_cycle.to_string(), format!("{}", hash), block_json],
jferrant marked this conversation as resolved.
Show resolved Hide resolved
)
.map_err(|e| {
return DBError::Other(format!(
DBError::Other(format!(
"Unable to insert block into db: {:?}",
e.to_string()
));
))
})?;
Ok(())
}
Expand All @@ -136,7 +136,7 @@ impl SignerDb {
debug!("Deleting block_info: sighash = {hash}");
self.db.execute(
"DELETE FROM blocks WHERE reward_cycle = ? AND signer_signature_hash = ?",
&[reward_cycle.to_string(), format!("{}", hash)],
[reward_cycle.to_string(), format!("{}", hash)],
jferrant marked this conversation as resolved.
Show resolved Hide resolved
)?;

Ok(())
Expand All @@ -147,8 +147,8 @@ impl SignerDb {
pub fn test_signer_db(db_path: &str) -> SignerDb {
use std::fs;

if fs::metadata(&db_path).is_ok() {
fs::remove_file(&db_path).unwrap();
if fs::metadata(db_path).is_ok() {
fs::remove_file(db_path).unwrap();
}
SignerDb::new(db_path).expect("Failed to create signer db")
}
Expand Down