Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not attempt to treat miner as a coordinator when updating DKG #4591

Merged
merged 9 commits into from Apr 3, 2024
13 changes: 3 additions & 10 deletions stacks-signer/src/coordinator.rs
Expand Up @@ -91,17 +91,10 @@ impl CoordinatorSelector {
}
}
new_index
} else if ROTATE_COORDINATORS {
self.coordinator_index.saturating_add(1) % self.coordinator_ids.len()
} else {
if ROTATE_COORDINATORS {
let mut new_index = self.coordinator_index.saturating_add(1);
if new_index == self.coordinator_ids.len() {
// We have exhausted all potential coordinators. Go back to the start
new_index = 0;
}
new_index
} else {
self.coordinator_index
}
self.coordinator_index
};
self.coordinator_id = *self
.coordinator_ids
Expand Down
1 change: 1 addition & 0 deletions stacks-signer/src/main.rs
Expand Up @@ -400,6 +400,7 @@ pub mod tests {
use super::{handle_generate_stacking_signature, *};
use crate::{GenerateStackingSignatureArgs, GlobalConfig};

#[allow(clippy::too_many_arguments)]
fn call_verify_signer_sig(
pox_addr: &PoxAddress,
reward_cycle: u128,
Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/src/runloop.rs
Expand Up @@ -392,7 +392,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
if signer.approved_aggregate_public_key.is_none() {
if let Err(e) = retry_with_exponential_backoff(|| {
signer
.update_dkg(&self.stacks_client, current_reward_cycle)
.update_dkg(&self.stacks_client)
.map_err(backoff::Error::transient)
}) {
error!("{signer}: failed to update DKG: {e}");
Expand Down
120 changes: 65 additions & 55 deletions stacks-signer/src/signer.rs
Expand Up @@ -186,9 +186,11 @@ impl std::fmt::Display for Signer {
}

impl Signer {
/// Return the current coordinator. If in the active reward cycle, this is the miner,
/// so the first element of the tuple will be None (because the miner does not have a signer index).
fn get_coordinator(&self, current_reward_cycle: u64) -> (Option<u32>, PublicKey) {
/// Return the current coordinator.
/// If the current reward cycle is the active reward cycle, this is the miner,
/// so the first element of the tuple will be None (because the miner does not have a signer index).
/// Otherwise, the coordinator is the signer with the index returned by the coordinator selector.
fn get_coordinator_sign(&self, current_reward_cycle: u64) -> (Option<u32>, PublicKey) {
if self.reward_cycle == current_reward_cycle {
let Some(ref cur_miner) = self.miner_key else {
error!(
Expand All @@ -199,12 +201,18 @@ impl Signer {
return (Some(selected.0), selected.1);
};
// coordinator is the current miner.
(None, cur_miner.clone())
(None, *cur_miner)
} else {
let selected = self.coordinator_selector.get_coordinator();
return (Some(selected.0), selected.1);
(Some(selected.0), selected.1)
}
}

/// Get the current coordinator for executing DKG
/// This will always use the coordinator selector to determine the coordinator
fn get_coordinator_dkg(&self) -> (u32, PublicKey) {
self.coordinator_selector.get_coordinator()
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -429,24 +437,36 @@ impl Signer {
stacks_client: &StacksClient,
current_reward_cycle: u64,
) {
let coordinator_id = self.get_coordinator(current_reward_cycle).0;
match &self.state {
State::Idle => {
let Some(command) = self.commands.front() else {
debug!("{self}: Nothing to process. Waiting for command...");
return;
};
let coordinator_id = if matches!(command, Command::Dkg) {
// We cannot execute a DKG command if we are not the coordinator
Some(self.get_coordinator_dkg().0)
} else {
self.get_coordinator_sign(current_reward_cycle).0
};
if coordinator_id != Some(self.signer_id) {
debug!(
"{self}: Coordinator is {coordinator_id:?}. Will not process any commands...",
);
return;
}
if let Some(command) = self.commands.pop_front() {
self.execute_command(stacks_client, &command);
} else {
debug!("{self}: Nothing to process. Waiting for command...",);
}
let command = self
.commands
.pop_front()
.expect("BUG: Already asserted that the command queue was not empty");
self.execute_command(stacks_client, &command);
}
State::OperationInProgress => {
// We cannot execute the next command until the current one is finished...
debug!("{self}: Waiting for coordinator {coordinator_id:?} operation to finish. Coordinator state = {:?}", self.coordinator.state);
debug!(
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
self.coordinator.state
);
}
}
}
Expand All @@ -459,7 +479,6 @@ impl Signer {
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) {
let coordinator_id = self.get_coordinator(current_reward_cycle).0;
let mut block_info = match block_validate_response {
BlockValidateResponse::Ok(block_validate_ok) => {
let signer_signature_hash = block_validate_ok.signer_signature_hash;
Expand Down Expand Up @@ -531,32 +550,13 @@ impl Signer {
sig: vec![],
};
self.handle_packets(stacks_client, res, &[packet], current_reward_cycle);
} else {
if block_info.valid.unwrap_or(false)
&& !block_info.signed_over
&& coordinator_id == Some(self.signer_id)
{
// We are the coordinator. Trigger a signing round for this block
debug!(
"{self}: attempt to trigger a signing round for block";
"signer_sighash" => %block_info.block.header.signer_signature_hash(),
"block_hash" => %block_info.block.header.block_hash(),
);
self.commands.push_back(Command::Sign {
block: block_info.block.clone(),
is_taproot: false,
merkle_root: None,
});
} else {
debug!(
"{self}: ignoring block.";
"block_hash" => block_info.block.header.block_hash(),
"valid" => block_info.valid,
"signed_over" => block_info.signed_over,
"coordinator_id" => coordinator_id,
);
}
}
debug!(
"{self}: Received a block validate response";
"block_hash" => block_info.block.header.block_hash(),
"valid" => block_info.valid,
"signed_over" => block_info.signed_over,
);
self.signer_db
.insert_block(self.reward_cycle, &block_info)
.unwrap_or_else(|_| panic!("{self}: Failed to insert block in DB"));
Expand All @@ -570,7 +570,6 @@ impl Signer {
messages: &[SignerMessage],
current_reward_cycle: u64,
) {
let coordinator_pubkey = self.get_coordinator(current_reward_cycle).1;
let packets: Vec<Packet> = messages
.iter()
.filter_map(|msg| match msg {
Expand All @@ -579,6 +578,11 @@ impl Signer {
| SignerMessage::Transactions(_) => None,
// TODO: if a signer tries to trigger DKG and we already have one set in the contract, ignore the request.
SignerMessage::Packet(packet) => {
let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) {
self.get_coordinator_dkg().1
} else {
self.get_coordinator_sign(current_reward_cycle).1
};
self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey)
}
})
Expand Down Expand Up @@ -643,6 +647,19 @@ impl Signer {
}
}

/// Helper function for determining if the provided message is a DKG specific message
fn is_dkg_message(msg: &Message) -> bool {
matches!(
msg,
Message::DkgBegin(_)
| Message::DkgEnd(_)
| Message::DkgEndBegin(_)
| Message::DkgPrivateBegin(_)
| Message::DkgPrivateShares(_)
| Message::DkgPublicShares(_)
)
}

/// Process inbound packets as both a signer and a coordinator
/// Will send outbound packets and operation results as appropriate
fn handle_packets(
Expand Down Expand Up @@ -940,7 +957,7 @@ impl Signer {
};
self.signer_db
.insert_block(self.reward_cycle, &updated_block_info)
.expect(&format!("{self}: Failed to insert block in DB"));
.unwrap_or_else(|_| panic!("{self}: Failed to insert block in DB"));
let process_request = updated_block_info.vote.is_some();
if !process_request {
debug!("Failed to validate nonce request");
Expand Down Expand Up @@ -1003,14 +1020,12 @@ impl Signer {
) {
error!("{}: Failed to serialize DKGResults message for StackerDB, will continue operating.", self.signer_id;
"error" => %e);
} else {
if let Err(e) = self
.stackerdb
.send_message_bytes_with_retry(&MessageSlotID::DkgResults, dkg_results_bytes)
{
error!("{}: Failed to send DKGResults message to StackerDB, will continue operating.", self.signer_id;
} else if let Err(e) = self
.stackerdb
.send_message_bytes_with_retry(&MessageSlotID::DkgResults, dkg_results_bytes)
{
error!("{}: Failed to send DKGResults message to StackerDB, will continue operating.", self.signer_id;
"error" => %e);
}
}

let epoch = retry_with_exponential_backoff(|| {
Expand Down Expand Up @@ -1236,11 +1251,7 @@ impl Signer {
}

/// Update the DKG for the provided signer info, triggering it if required
pub fn update_dkg(
&mut self,
stacks_client: &StacksClient,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
let reward_cycle = self.reward_cycle;
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
Expand All @@ -1259,13 +1270,12 @@ impl Signer {
}
return Ok(());
};
if self.state != State::Idle
|| Some(self.signer_id) != self.get_coordinator(current_reward_cycle).0
{
if self.state != State::Idle || self.signer_id != self.get_coordinator_dkg().0 {
// We are not the coordinator or we are in the middle of an operation. Do not attempt to queue DKG
return Ok(());
}
debug!("{self}: Checking if old DKG vote transaction exists in StackerDB...");
debug!("{self}: Checking if old DKG vote transaction exists in StackerDB...");
jferrant marked this conversation as resolved.
Show resolved Hide resolved
// Have I already voted, but the vote is still pending in StackerDB? Check stackerdb for the same round number and reward cycle vote transaction
// Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes
let signer_address = stacks_client.get_signer_address();
Expand Down
6 changes: 3 additions & 3 deletions stacks-signer/src/signerdb.rs
Expand Up @@ -35,7 +35,7 @@ pub struct SignerDb {
db: Connection,
}

const CREATE_BLOCKS_TABLE: &'static str = "
const CREATE_BLOCKS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS blocks (
reward_cycle INTEGER NOT NULL,
signer_signature_hash TEXT NOT NULL,
Expand Down Expand Up @@ -170,8 +170,8 @@ where
pub fn test_signer_db(db_path: &str) -> SignerDb {
use std::fs;

if fs::metadata(&db_path).is_ok() {
fs::remove_file(&db_path).unwrap();
if fs::metadata(db_path).is_ok() {
fs::remove_file(db_path).unwrap();
}
SignerDb::new(db_path).expect("Failed to create signer db")
}
Expand Down