Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: isolate event-dispatcher serialization code #4545

Merged
merged 6 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 1 addition & 51 deletions stackslib/src/chainstate/stacks/boot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,57 +213,10 @@ fn hex_deserialize<'de, D: serde::Deserializer<'de>>(
Ok(bytes)
}

fn serialize_optional_u128_as_string<S>(
value: &Option<u128>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match value {
Some(v) => serializer.serialize_str(&v.to_string()),
None => serializer.serialize_none(),
}
}

fn deserialize_optional_u128_from_string<'de, D>(deserializer: D) -> Result<Option<u128>, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(deserializer)?;
match s {
Some(str_val) => str_val
.parse::<u128>()
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}

fn serialize_u128_as_string<S>(value: &u128, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}

fn deserialize_u128_from_string<'de, D>(deserializer: D) -> Result<u128, D::Error>
where
D: serde::Deserializer<'de>,
{
use std::str::FromStr;
let s = String::deserialize(deserializer)?;
u128::from_str(&s).map_err(serde::de::Error::custom)
}

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct NakamotoSignerEntry {
#[serde(serialize_with = "hex_serialize", deserialize_with = "hex_deserialize")]
pub signing_key: [u8; 33],
#[serde(
serialize_with = "serialize_u128_as_string",
deserialize_with = "deserialize_u128_from_string"
)]
pub stacked_amt: u128,
pub weight: u32,
}
Expand All @@ -275,10 +228,7 @@ pub struct RewardSet {
#[serde(skip_serializing_if = "Option::is_none", default)]
// only generated for nakamoto reward sets
pub signers: Option<Vec<NakamotoSignerEntry>>,
#[serde(
serialize_with = "serialize_optional_u128_as_string",
deserialize_with = "deserialize_optional_u128_from_string"
)]
#[serde(default)]
pub pox_ustx_threshold: Option<u128>,
}

Expand Down
115 changes: 95 additions & 20 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use stacks::chainstate::burn::ConsensusHash;
use stacks::chainstate::coordinator::BlockEventDispatcher;
use stacks::chainstate::nakamoto::NakamotoBlock;
use stacks::chainstate::stacks::address::PoxAddress;
use stacks::chainstate::stacks::boot::RewardSetData;
use stacks::chainstate::stacks::boot::{
NakamotoSignerEntry, PoxStartCycleInfo, RewardSet, RewardSetData,
};
use stacks::chainstate::stacks::db::accounts::MinerReward;
use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;
use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksHeaderInfo};
Expand All @@ -36,6 +38,7 @@ use stacks::net::api::postblock_proposal::{
};
use stacks::net::atlas::{Attachment, AttachmentInstance};
use stacks::net::stackerdb::StackerDBEventDispatcher;
use stacks::util::hash::to_hex;
use stacks_common::bitvec::BitVec;
use stacks_common::codec::StacksMessageCodec;
use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId};
Expand Down Expand Up @@ -106,6 +109,79 @@ pub struct MinedNakamotoBlockEvent {
pub signer_bitvec: String,
}

fn serialize_u128_as_string<S>(value: &u128, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}

fn serialize_pox_addresses<S>(value: &Vec<PoxAddress>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_seq(value.iter().cloned().map(|a| a.to_b58()))
}

fn serialize_optional_u128_as_string<S>(
value: &Option<u128>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match value {
Some(v) => serializer.serialize_str(&v.to_string()),
None => serializer.serialize_none(),
}
}

fn hex_serialize<S: serde::Serializer>(addr: &[u8; 33], s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&to_hex(addr))
}

#[derive(Debug, PartialEq, Clone, Serialize)]
pub struct RewardSetEventPayload {
#[serde(serialize_with = "serialize_pox_addresses")]
pub rewarded_addresses: Vec<PoxAddress>,
pub start_cycle_state: PoxStartCycleInfo,
#[serde(skip_serializing_if = "Option::is_none", default)]
// only generated for nakamoto reward sets
pub signers: Option<Vec<NakamotoSignerEntryPayload>>,
#[serde(serialize_with = "serialize_optional_u128_as_string")]
pub pox_ustx_threshold: Option<u128>,
}

#[derive(Debug, PartialEq, Clone, Serialize)]
pub struct NakamotoSignerEntryPayload {
#[serde(serialize_with = "hex_serialize")]
pub signing_key: [u8; 33],
#[serde(serialize_with = "serialize_u128_as_string")]
pub stacked_amt: u128,
pub weight: u32,
}

impl RewardSetEventPayload {
pub fn signer_entry_to_payload(entry: &NakamotoSignerEntry) -> NakamotoSignerEntryPayload {
NakamotoSignerEntryPayload {
signing_key: entry.signing_key,
stacked_amt: entry.stacked_amt,
weight: entry.weight,
}
}
pub fn from_reward_set(reward_set: &RewardSet) -> Self {
Self {
rewarded_addresses: reward_set.rewarded_addresses.clone(),
start_cycle_state: reward_set.start_cycle_state.clone(),
signers: reward_set
.signers
.as_ref()
.map(|signers| signers.iter().map(Self::signer_entry_to_payload).collect()),
pox_ustx_threshold: reward_set.pox_ustx_threshold,
}
}
}

impl EventObserver {
pub fn send_payload(&self, payload: &serde_json::Value, path: &str) {
let body = match serde_json::to_vec(&payload) {
Expand Down Expand Up @@ -410,8 +486,22 @@ impl EventObserver {
tx_index += 1;
}

let signer_bitvec_value = signer_bitvec_opt
.as_ref()
.map(|bitvec| serde_json::to_value(bitvec).unwrap_or_default())
.unwrap_or_default();

let (reward_set_value, cycle_number_value) = match &reward_set_data {
Some(data) => (
serde_json::to_value(&RewardSetEventPayload::from_reward_set(&data.reward_set))
.unwrap_or_default(),
serde_json::to_value(data.cycle_number).unwrap_or_default(),
),
None => (serde_json::Value::Null, serde_json::Value::Null),
};

// Wrap events
let mut payload = json!({
let payload = json!({
"block_hash": format!("0x{}", block.block_hash),
"block_height": metadata.stacks_block_height,
"burn_block_hash": format!("0x{}", metadata.burn_header_hash),
Expand All @@ -434,26 +524,11 @@ impl EventObserver {
"pox_v1_unlock_height": pox_constants.v1_unlock_height,
"pox_v2_unlock_height": pox_constants.v2_unlock_height,
"pox_v3_unlock_height": pox_constants.v3_unlock_height,
"signer_bitvec": signer_bitvec_value,
"reward_set": reward_set_value,
"cycle_number": cycle_number_value,
});

if let Some(signer_bitvec) = signer_bitvec_opt {
payload.as_object_mut().unwrap().insert(
"signer_bitvec".to_string(),
serde_json::to_value(signer_bitvec).unwrap_or_default(),
);
}

if let Some(reward_set_data) = reward_set_data {
payload.as_object_mut().unwrap().insert(
"reward_set".to_string(),
serde_json::to_value(&reward_set_data.reward_set).unwrap_or_default(),
);
payload.as_object_mut().unwrap().insert(
"cycle_number".to_string(),
serde_json::to_value(reward_set_data.cycle_number).unwrap_or_default(),
);
}

payload
}
}
Expand Down
5 changes: 4 additions & 1 deletion testnet/stacks-node/src/tests/nakamoto_integrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1606,7 +1606,10 @@ fn correct_burn_outs() {

let new_blocks_with_reward_set: Vec<serde_json::Value> = test_observer::get_blocks()
.into_iter()
.filter(|block| block.get("reward_set").is_some() && block.get("cycle_number").is_some())
.filter(|block| {
block.get("reward_set").map_or(false, |v| !v.is_null())
&& block.get("cycle_number").map_or(false, |v| !v.is_null())
})
.collect();
info!(
"Announced blocks that include reward sets: {:#?}",
Expand Down