From 4f083d90bc9bcceab1fad5be54a1d69c50a66f03 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 2 Apr 2025 15:42:37 -0700 Subject: [PATCH 01/12] name rewards_info helpers and use static values Tests where the values were different were testing percentages rather than anything specific to EpochRewardInfo. From 33afd52ea3cf1a291865e989d4e0e3c47e891923 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 2 Apr 2025 17:54:53 -0700 Subject: [PATCH 02/12] Cleanup hex_boosting tests - reward_poc_dc now takes a FileSinkClient non reference so it will drop when rewarding is done. - Add AsPubkeyMap trait for converting Vec to a PublicKeyBinary HashMap. --- mobile_verifier/src/rewarder.rs | 10 +- .../tests/integrations/common/mod.rs | 75 +- .../tests/integrations/hex_boosting.rs | 674 ++++++------------ .../tests/integrations/rewarder_poc_dc.rs | 18 +- 4 files changed, 259 insertions(+), 518 deletions(-) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8be1d3076..fa8d2227b 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -286,7 +286,7 @@ where let poc_dc_shares = reward_poc_and_dc( &self.pool, &self.hex_service_client, - &self.mobile_rewards, + self.mobile_rewards.clone(), &self.speedtest_averages, &reward_info, price_info.clone(), @@ -394,7 +394,7 @@ where pub async fn reward_poc_and_dc( pool: &Pool, hex_service_client: &impl HexBoostingInfoResolver, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, speedtest_avg_sink: &FileSinkClient, reward_info: &EpochRewardInfo, price_info: PriceInfo, @@ -420,7 +420,7 @@ pub async fn reward_poc_and_dc( // reward dc before poc so that we can calculate the unallocated dc reward // and carry this into the poc pool let dc_unallocated_amount = reward_dc( - mobile_rewards, + &mobile_rewards, reward_info, transfer_rewards, &reward_shares, @@ -431,7 +431,7 @@ pub async fn reward_poc_and_dc( let (poc_unallocated_amount, calculated_poc_reward_shares) = reward_poc( pool, hex_service_client, - mobile_rewards, + &mobile_rewards, speedtest_avg_sink, reward_info, reward_shares, @@ -444,7 +444,7 @@ pub async fn reward_poc_and_dc( .unwrap_or(0); write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Poc, poc_unallocated_amount, reward_info, diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 19fdc83cc..ddfb66a33 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -121,36 +121,6 @@ impl MockFileSinkReceiver { } impl MockFileSinkReceiver { - pub async fn receive_radio_reward_v1(&mut self) -> RadioReward { - match self.receive("receive_radio_reward_v1").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::RadioReward(r)) => r, - err => panic!("failed to get radio reward: {err:?}"), - }, - None => panic!("failed to receive radio reward"), - } - } - - pub async fn receive_radio_reward(&mut self) -> RadioRewardV2 { - // NOTE(mj): When v1 rewards stop being written, remove this receiver - // and the comparison. - let radio_reward_v1 = self.receive_radio_reward_v1().await; - match self.receive("receive_radio_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::RadioRewardV2(reward)) => { - assert_eq!( - reward.total_poc_reward(), - radio_reward_v1.poc_reward, - "mismatch in poc rewards between v1 and v2" - ); - reward - } - err => panic!("failed to get radio reward: {err:?}"), - }, - None => panic!("failed to receive radio reward"), - } - } - pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward { match self.receive("receive_service_provider_reward").await { Some(mobile_reward) => match mobile_reward.reward { @@ -212,6 +182,10 @@ pub trait RadioRewardV2Ext { } impl RadioRewardV2Ext for RadioRewardV2 { + fn hotspot_key_string(&self) -> String { + PublicKeyBinary::from(self.hotspot_key.to_vec()).to_string() + } + fn boosted_hexes(&self) -> Vec { self.covered_hexes.to_vec() } @@ -417,6 +391,20 @@ impl MobileRewardShareMessages { MobileReward::PromotionReward(inner) => self.promotion_reward.push(inner), } } + + pub fn unallocated_amount_or_default(&self) -> u64 { + self.unallocated + .iter() + .map(|reward| reward.amount) + .sum::() + } + + pub fn total_poc_rewards(&self) -> u64 { + self.radio_reward_v2 + .iter() + .map(|reward| reward.total_poc_reward()) + .sum() + } } #[async_trait::async_trait] @@ -485,3 +473,30 @@ impl NonBlockingFileSinkReceiver { } } } + +// Allows converting from a Vec to HashMap +// +// This trait assumes there will not be multiple entries +// in the Vec for a given String. +pub trait AsStringKeyedMap { + fn as_keyed_map(&self, key_func: impl Fn(&V) -> String) -> HashMap + where + Self: Sized; +} + +impl AsStringKeyedMap for Vec { + fn as_keyed_map(&self, key_func: impl Fn(&V) -> String) -> HashMap + where + Self: Sized, + { + let mut map = HashMap::new(); + for item in self { + let key = key_func(item); + if map.contains_key(&key) { + panic!("Duplicate string key found: {}", key); + } + map.insert(key, item.clone()); + } + map + } +} diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 4aaa2ae24..8057cd420 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -1,5 +1,5 @@ use crate::common::{ - self, default_price_info, reward_info_24_hours, MockFileSinkReceiver, MockHexBoostingClient, + self, default_price_info, reward_info_24_hours, AsStringKeyedMap, MockHexBoostingClient, RadioRewardV2Ext, }; use chrono::{DateTime, Duration as ChronoDuration, Duration, Utc}; @@ -9,15 +9,11 @@ use file_store::{ unique_connections::{UniqueConnectionReq, UniqueConnectionsIngestReport}, }; use helium_crypto::PublicKeyBinary; -use helium_proto::services::{ - poc_lora::UnallocatedRewardType, - poc_mobile::{ - CoverageObjectValidity, HeartbeatValidity, LocationSource, MobileRewardShare, - RadioRewardV2, SeniorityUpdateReason, SignalLevel, UnallocatedReward, - }, +use helium_proto::services::poc_mobile::{ + CoverageObjectValidity, HeartbeatValidity, LocationSource, SeniorityUpdateReason, SignalLevel, }; use hextree::Cell; -use mobile_config::boosted_hex_info::BoostedHexInfo; +use mobile_config::{boosted_hex_info::BoostedHexInfo, sub_dao_epoch_reward_info::EpochRewardInfo}; use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, @@ -55,7 +51,7 @@ async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -137,54 +133,28 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - let price_info = default_price_info(); - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + price_info, + ) + .await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let rewards = mobile_rewards.finish().await?; - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); - let hotspot_1 = poc_rewards.next().unwrap(); - let hotspot_3 = poc_rewards.next().unwrap(); - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); + + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -227,17 +197,10 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - assert_eq!(total_poc_emissions, total); - - // confirm the rewarded percentage amount matches expectations - - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -248,7 +211,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu // this simulates the case where we have radios in boosted hexes but where the coverage // thresholds for the radios have not been met // the end result is that no boosting takes place, the radios are awarded non boosted reward values - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; @@ -331,68 +294,46 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu let reward_info = reward_info_24_hours(); let price_info = default_price_info(); - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards(&mut mobile_rewards) - ); - if let Ok((poc_rewards, unallocated_reward)) = rewards { - // assert poc reward outputs - let exp_reward_1 = 16438356164383; - let exp_reward_2 = 16438356164383; - let exp_reward_3 = 16438356164383; - - assert_eq!(exp_reward_1, poc_rewards[0].total_poc_reward()); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(poc_rewards[0].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_2, poc_rewards[1].total_poc_reward()); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(poc_rewards[1].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_3, poc_rewards[2].total_poc_reward()); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(poc_rewards[2].hotspot_key.clone()).to_string() - ); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + price_info, + ) + .await?; - // assert the number of boosted hexes for each radio - assert_eq!(0, poc_rewards[0].boosted_hexes_len()); - assert_eq!(0, poc_rewards[1].boosted_hexes_len()); - assert_eq!(0, poc_rewards[2].boosted_hexes_len()); + let rewards = mobile_rewards.finish().await?; - // confirm the total rewards allocated matches expectations - let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); - let unallocated_sum: u64 = unallocated_reward.amount; - let total = poc_sum + unallocated_sum; + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); + + assert_eq!(16_438_356_164_383, hotspot_1.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_2.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_3.total_poc_reward()); + + // assert the number of boosted hexes for each radio + assert_eq!(0, hotspot_1.boosted_hexes_len()); + assert_eq!(0, hotspot_2.boosted_hexes_len()); + assert_eq!(0, hotspot_3.boosted_hexes_len()); + + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); - } else { - panic!("no rewards received"); - }; Ok(()) } #[sqlx::test] async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -486,57 +427,25 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res }, ]; - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - - let price_info = default_price_info(); - - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let rewards = mobile_rewards.finish().await?; - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // 1 boost at 20x - let hotspot_1 = poc_rewards.next().unwrap(); // 2 boost at 10x - let hotspot_3 = poc_rewards.next().unwrap(); // no boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // Calculating expected rewards // - 2 covered hexes boosted at 10x @@ -594,23 +503,17 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res assert_eq!(0x8a1fb46622d7fff_u64, hotspot_1_boosted_hexes[1].location); assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - assert_eq!(total_poc_emissions, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } #[sqlx::test] async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -667,73 +570,50 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let price_info = default_price_info(); + let rewards = mobile_rewards.finish().await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards(&mut mobile_rewards) - ); - if let Ok((poc_rewards, unallocated_reward)) = rewards { - // assert poc reward outputs - let exp_reward_1 = 16_438_356_164_383; - let exp_reward_2 = 16_438_356_164_383; - let exp_reward_3 = 16_438_356_164_383; - - assert_eq!(exp_reward_1, poc_rewards[0].total_poc_reward()); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(poc_rewards[0].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_2, poc_rewards[1].total_poc_reward()); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(poc_rewards[1].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_3, poc_rewards[2].total_poc_reward()); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(poc_rewards[2].hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); - // assert the number of boosted hexes for each radio - // all will be zero as the boost period has expired for the single boosted hex - assert_eq!(0, poc_rewards[0].boosted_hexes_len()); - assert_eq!(0, poc_rewards[1].boosted_hexes_len()); - assert_eq!(0, poc_rewards[2].boosted_hexes_len()); + assert_eq!(poc_rewards.len(), 3); - // confirm the total rewards allocated matches expectations - let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); - let unallocated_sum: u64 = unallocated_reward.amount; - let total = poc_sum + unallocated_sum; + // assert poc reward outputs + assert_eq!(16_438_356_164_383, hotspot_1.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_2.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_3.total_poc_reward()); + + // assert the number of boosted hexes for each radio + // all will be zero as the boost period has expired for the single boosted hex + assert_eq!(0, hotspot_1.boosted_hexes_len()); + assert_eq!(0, hotspot_2.boosted_hexes_len()); + assert_eq!(0, hotspot_3.boosted_hexes_len()); + + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); - } else { - panic!("no rewards received"); - }; Ok(()) } #[sqlx::test] async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -796,55 +676,27 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let price_info = default_price_info(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + let rewards = mobile_rewards.finish().await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location NO boosts + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // full location trust NO boosts - let hotspot_1 = poc_rewards.next().unwrap(); // full location trust 1 boost - let hotspot_3 = poc_rewards.next().unwrap(); // reduced location trust 1 boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -889,20 +741,10 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: assert_eq!(2, hotspot_1.nth_boosted_hex(0).boosted_multiplier); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -911,7 +753,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( pool: PgPool, ) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -977,55 +819,27 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let price_info = default_price_info(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + let rewards = mobile_rewards.finish().await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location trust NO boosts + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // full location trust NO boosts - let hotspot_1 = poc_rewards.next().unwrap(); // full location trust 1 boost - let hotspot_3 = poc_rewards.next().unwrap(); // reduced location trust 1 boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -1070,27 +884,17 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( assert_eq!(2, hotspot_1.nth_boosted_hex(0).boosted_multiplier); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } #[sqlx::test] async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -1181,54 +985,27 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let price_info = default_price_info(); + let rewards = mobile_rewards.finish().await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards + .radio_reward_v2 + .as_keyed_map(|v| v.hotspot_key_string()); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // 2 boosts at 10x + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // 1 boost at 20x + let hotspot_4 = poc_rewards.get(HOTSPOT_4).expect("hotspot 4"); // no boosts - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // 1 boost at 20x - let hotspot_1 = poc_rewards.next().unwrap(); // 2 boosts at 10x - let hotspot_3 = poc_rewards.next().unwrap(); // no boosts - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_4.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -1258,12 +1035,12 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an assert_eq!(exp_reward_1, hotspot_1.total_poc_reward()); assert_eq!(exp_reward_2, hotspot_2.total_poc_reward()); - assert_eq!(exp_reward_3, hotspot_3.total_poc_reward()); + assert_eq!(exp_reward_3, hotspot_4.total_poc_reward()); // assert the number of boosted hexes for each radio assert_eq!(1, hotspot_2.boosted_hexes_len()); assert_eq!(2, hotspot_1.boosted_hexes_len()); - assert_eq!(1, hotspot_3.boosted_hexes_len()); + assert_eq!(1, hotspot_4.boosted_hexes_len()); // assert the hex boost multiplier values // as hotspot 3 has 2 covered hexes, it should have 2 boosted hexes @@ -1280,20 +1057,10 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an assert_eq!(0x8a1fb46622d7fff_u64, hotspot_1_boosted_hexes[1].location); assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -1302,58 +1069,6 @@ fn rounded(num: Decimal) -> u64 { num.to_u64().unwrap_or_default() } -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - receive_expected_rewards_maybe_unallocated(mobile_rewards, ExpectUnallocated::Yes).await -} - -enum ExpectUnallocated { - Yes, - NoWhenValue(u64), -} - -async fn receive_expected_rewards_maybe_unallocated( - mobile_rewards: &mut MockFileSinkReceiver, - expect_unallocated: ExpectUnallocated, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - // get the filestore outputs from rewards run - let radio_reward1 = mobile_rewards.receive_radio_reward().await; - let radio_reward2 = mobile_rewards.receive_radio_reward().await; - let radio_reward3 = mobile_rewards.receive_radio_reward().await; - - // ordering is not guaranteed, so stick the rewards into a vec and sort - let mut poc_rewards = vec![radio_reward1, radio_reward2, radio_reward3]; - poc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); - - let unallocated_poc_reward = match expect_unallocated { - ExpectUnallocated::Yes => mobile_rewards.receive_unallocated_reward().await, - ExpectUnallocated::NoWhenValue(max_emission) => { - let total: u64 = poc_rewards.iter().map(|p| p.total_poc_reward()).sum(); - let emitted_is_total = total == max_emission; - tracing::info!( - emitted_is_total, - total, - max_emission, - "receiving expected rewards unallocated amount" - ); - if emitted_is_total { - UnallocatedReward { - reward_type: UnallocatedRewardType::Poc.into(), - amount: 0, - } - } else { - mobile_rewards.receive_unallocated_reward().await - } - } - }; - - // should be no further msgs - mobile_rewards.assert_no_messages(); - - Ok((poc_rewards, unallocated_poc_reward)) -} - async fn seed_heartbeats_v1( ts: DateTime, txn: &mut Transaction<'_, Postgres>, @@ -1860,3 +1575,20 @@ fn get_poc_allocation_buckets(total_emissions: Decimal) -> (Decimal, Decimal) { (regular_poc, boosted_poc) } + +fn assert_total_matches_emissions(total: u64, reward_info: &EpochRewardInfo) { + // confirm the total rewards allocated matches expectations + let total_poc_emissions = + reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!( + total_poc_emissions, total, + "total does not match expected emissions" + ); + + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(total) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.6), "POC and DC is always 60%"); +} diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index 692e457aa..73e975f16 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -63,13 +63,12 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards.radio_reward_v2; @@ -152,13 +151,12 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; let poc_rewards = msgs.radio_reward_v2; @@ -215,13 +213,12 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { let _rewarder = rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info.clone(), ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; assert_eq!(msgs.gateway_reward.len(), 3); @@ -239,13 +236,12 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { let _rewarder = rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; assert_eq!(msgs.gateway_reward.len(), 3); @@ -290,13 +286,12 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards.radio_reward_v2; @@ -346,13 +341,12 @@ async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Resu rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards.radio_reward_v2; From cb70284c6299e8f878fcc5496e6b48cb8dad0d62 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 14:53:28 -0700 Subject: [PATCH 03/12] Cleanup mapper reward tests --- mobile_verifier/src/rewarder.rs | 6 +- .../tests/integrations/common/mod.rs | 31 ++-- .../tests/integrations/rewarder_mappers.rs | 133 ++++++------------ 3 files changed, 67 insertions(+), 103 deletions(-) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index fa8d2227b..437942cee 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -294,7 +294,7 @@ where .await?; // process rewards for mappers - reward_mappers(&self.pool, &self.mobile_rewards, &reward_info).await?; + reward_mappers(&self.pool, self.mobile_rewards.clone(), &reward_info).await?; // process rewards for service providers let dc_sessions = service_provider::get_dc_sessions( @@ -552,7 +552,7 @@ pub async fn reward_dc( pub async fn reward_mappers( pool: &Pool, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { let rewardable_mapping_activity = subscriber_mapping_activity::db::rewardable_mapping_activity( @@ -587,7 +587,7 @@ pub async fn reward_mappers( .unwrap_or(0) - allocated_mapping_rewards; write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Mapper, unallocated_mapping_reward_amount, reward_info, diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index ddfb66a33..579360918 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Duration, Utc}; use file_store::{ file_sink::{FileSinkClient, Message as SinkMessage}, - traits::TimestampEncode, + traits::{MsgBytes, TimestampEncode}, }; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; @@ -131,16 +131,6 @@ impl MockFileSinkReceiver { } } - pub async fn receive_subscriber_reward(&mut self) -> SubscriberReward { - match self.receive("receive_subscriber_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::SubscriberReward(r)) => r, - _ => panic!("failed to get subscriber reward"), - }, - None => panic!("failed to receive subscriber reward"), - } - } - pub async fn receive_promotion_reward(&mut self) -> PromotionReward { match self.receive("receive_promotion_reward").await { Some(mobile_reward) => match mobile_reward.reward { @@ -173,7 +163,19 @@ pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { ) } +pub trait SubscriberRewardExt { + fn subscriber_id_string(&self) -> String; +} + +impl SubscriberRewardExt for SubscriberReward { + fn subscriber_id_string(&self) -> String { + use helium_proto::Message; + String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id") + } +} + pub trait RadioRewardV2Ext { + fn hotspot_key_string(&self) -> String; fn boosted_hexes(&self) -> Vec; fn nth_boosted_hex(&self, index: usize) -> radio_reward_v2::CoveredHex; fn boosted_hexes_len(&self) -> usize; @@ -405,6 +407,13 @@ impl MobileRewardShareMessages { .map(|reward| reward.total_poc_reward()) .sum() } + + pub fn total_sub_discovery_amount(&self) -> u64 { + self.subscriber_reward + .iter() + .map(|reward| reward.discovery_location_amount) + .sum() + } } #[async_trait::async_trait] diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 9a2969022..49d89e692 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -1,13 +1,8 @@ -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; +use crate::common::{self, reward_info_24_hours, AsStringKeyedMap, SubscriberRewardExt}; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::{ - services::poc_mobile::{ - MobileRewardShare, SubscriberReward, UnallocatedReward, UnallocatedRewardType, - }, - Message, -}; +use helium_proto::{services::poc_mobile::UnallocatedRewardType, Message}; use mobile_verifier::{ reward_shares, rewarder, subscriber_mapping_activity::{self, SubscriberMappingActivity}, @@ -15,7 +10,10 @@ use mobile_verifier::{ use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; -use std::{str::FromStr, string::ToString}; +use std::{ + str::{self, FromStr}, + string::ToString, +}; const SUBSCRIBER_1: &str = "subscriber1"; const SUBSCRIBER_2: &str = "subscriber2"; @@ -24,8 +22,7 @@ const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; #[sqlx::test] async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); - + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); // seed db @@ -33,91 +30,49 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { seed_mapping_data(reward_info.epoch_period.end, &mut txn).await?; txn.commit().await.expect("db txn failed"); - let (_, rewards) = tokio::join!( - rewarder::reward_mappers(&pool, &mobile_rewards_client, &reward_info), - receive_expected_rewards(&mut mobile_rewards) - ); + rewarder::reward_mappers(&pool, mobile_rewards_client, &reward_info).await?; - if let Ok((subscriber_rewards, unallocated_reward)) = rewards { - // assert the mapper rewards - // all 3 subscribers will have an equal share, - // requirement is 1 qualifying mapping criteria report per epoch - // subscriber 1 has two qualifying mapping criteria reports, - // other two subscribers one qualifying mapping criteria reports - assert_eq!( - SUBSCRIBER_1.to_string().encode_to_vec(), - subscriber_rewards[0].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[0].discovery_location_amount - ); - - assert_eq!( - SUBSCRIBER_2.to_string().encode_to_vec(), - subscriber_rewards[1].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[2].discovery_location_amount - ); - - assert_eq!( - SUBSCRIBER_3.to_string().encode_to_vec(), - subscriber_rewards[2].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[2].discovery_location_amount - ); - - // confirm our unallocated amount - assert_eq!( - UnallocatedRewardType::Mapper as i32, - unallocated_reward.reward_type - ); - assert_eq!(1, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - let subscriber_sum = subscriber_rewards[0].discovery_location_amount - + subscriber_rewards[1].discovery_location_amount - + subscriber_rewards[2].discovery_location_amount - + unallocated_reward.amount; - assert_eq!(expected_sum, subscriber_sum); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(subscriber_sum) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.2)); - } else { - panic!("no rewards received"); - }; - Ok(()) -} + let rewards = mobile_rewards.finish().await?; + let subscriber_rewards = rewards + .subscriber_reward + .as_keyed_map(|reward| reward.subscriber_id_string()); + + // assert the mapper rewards + // all 3 subscribers will have an equal share, + // requirement is 1 qualifying mapping criteria report per epoch + // subscriber 1 has two qualifying mapping criteria reports, + // other two subscribers one qualifying mapping criteria reports + let sub_reward_1 = subscriber_rewards.get(SUBSCRIBER_1).expect("sub 1"); + assert_eq!(5_479_452_054_794, sub_reward_1.discovery_location_amount); -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - // get the filestore outputs from rewards run - // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios - let subscriber_reward1 = mobile_rewards.receive_subscriber_reward().await; - let subscriber_reward2 = mobile_rewards.receive_subscriber_reward().await; - let subscriber_reward3 = mobile_rewards.receive_subscriber_reward().await; - let mut subscriber_rewards = vec![subscriber_reward1, subscriber_reward2, subscriber_reward3]; + let sub_reward_2 = subscriber_rewards.get(SUBSCRIBER_2).expect("sub 2"); + assert_eq!(5_479_452_054_794, sub_reward_2.discovery_location_amount); - subscriber_rewards.sort_by(|a, b| a.subscriber_id.cmp(&b.subscriber_id)); + let sub_reward_3 = subscriber_rewards.get(SUBSCRIBER_3).expect("sub 3"); + assert_eq!(5_479_452_054_794, sub_reward_3.discovery_location_amount); - // expect one unallocated reward - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; + // confirm our unallocated amount + let unallocated_reward = rewards.unallocated.first().expect("unallocated"); + assert_eq!( + UnallocatedRewardType::Mapper as i32, + unallocated_reward.reward_type + ); + assert_eq!(1, unallocated_reward.amount); - // should be no further msgs - mobile_rewards.assert_no_messages(); + // confirm the total rewards allocated matches expectations + let expected_sum = reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + let subscriber_sum = + rewards.total_sub_discovery_amount() + rewards.unallocated_amount_or_default(); + assert_eq!(expected_sum, subscriber_sum); - Ok((subscriber_rewards, unallocated_reward)) + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(subscriber_sum) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.2)); + + Ok(()) } async fn seed_mapping_data( From bdf7c70abccc8ee1bae98d0cc2190c310f12c570 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 14:56:37 -0700 Subject: [PATCH 04/12] Cleanup oracle reward test --- mobile_verifier/src/rewarder.rs | 6 +- .../tests/integrations/rewarder_oracles.rs | 66 +++++++------------ 2 files changed, 27 insertions(+), 45 deletions(-) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 437942cee..8ef1cd37d 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -316,7 +316,7 @@ where .await?; // process rewards for oracles - reward_oracles(&self.mobile_rewards, &reward_info).await?; + reward_oracles(self.mobile_rewards.clone(), &reward_info).await?; self.speedtest_averages.commit().await?; let written_files = self.mobile_rewards.commit().await?.await??; @@ -598,7 +598,7 @@ pub async fn reward_mappers( } pub async fn reward_oracles( - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { // atm 100% of oracle rewards are assigned to 'unallocated' @@ -611,7 +611,7 @@ pub async fn reward_oracles( .unwrap_or(0) - allocated_oracle_rewards; write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Oracle, unallocated_oracle_reward_amount, reward_info, diff --git a/mobile_verifier/tests/integrations/rewarder_oracles.rs b/mobile_verifier/tests/integrations/rewarder_oracles.rs index 1f517ee93..d91bb6a34 100644 --- a/mobile_verifier/tests/integrations/rewarder_oracles.rs +++ b/mobile_verifier/tests/integrations/rewarder_oracles.rs @@ -1,7 +1,5 @@ -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; -use helium_proto::services::poc_mobile::{ - MobileRewardShare, UnallocatedReward, UnallocatedRewardType, -}; +use crate::common::{self, reward_info_24_hours}; +use helium_proto::services::poc_mobile::UnallocatedRewardType; use mobile_verifier::{reward_shares, rewarder}; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -9,49 +7,33 @@ use sqlx::PgPool; #[sqlx::test] async fn test_oracle_rewards(_pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); - let (_, rewards) = tokio::join!( - // run rewards for oracles - rewarder::reward_oracles(&mobile_rewards_client, &reward_info), - receive_expected_rewards(&mut mobile_rewards) + // run rewards for oracles + rewarder::reward_oracles(mobile_rewards_client, &reward_info).await?; + + let rewards = mobile_rewards.finish().await?; + let unallocated_reward = rewards.unallocated.first().expect("Unallocated"); + + assert_eq!( + UnallocatedRewardType::Oracle as i32, + unallocated_reward.reward_type ); - if let Ok(unallocated_reward) = rewards { - assert_eq!( - UnallocatedRewardType::Oracle as i32, - unallocated_reward.reward_type - ); - // confirm our unallocated amount - assert_eq!(3_287_671_232_876, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_oracles(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, unallocated_reward.amount); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.04)); - } else { - panic!("no rewards received"); - }; - Ok(()) -} + // confirm our unallocated amount + assert_eq!(3_287_671_232_876, unallocated_reward.amount); -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result { - // expect one unallocated reward - // as oracle rewards are currently 100% unallocated - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; + // confirm the total rewards allocated matches expectations + let expected_sum = reward_shares::get_scheduled_tokens_for_oracles(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!(expected_sum, unallocated_reward.amount); - // should be no further msgs - mobile_rewards.assert_no_messages(); + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.04)); - Ok(unallocated_reward) + Ok(()) } From 8994161819dc7bdb4c01ec3db4ddb7f58cac9133 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 15:12:12 -0700 Subject: [PATCH 05/12] Cleanup Service Provider reward tests --- mobile_verifier/src/rewarder.rs | 6 +- .../tests/integrations/common/mod.rs | 64 +------- .../tests/integrations/rewarder_sp_rewards.rs | 145 +++++++----------- 3 files changed, 57 insertions(+), 158 deletions(-) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8ef1cd37d..6520a9964 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -309,7 +309,7 @@ where reward_service_providers( dc_sessions, sp_promotions.clone(), - &self.mobile_rewards, + self.mobile_rewards.clone(), &reward_info, price_info.price_per_bone, ) @@ -623,7 +623,7 @@ pub async fn reward_oracles( pub async fn reward_service_providers( dc_sessions: ServiceProviderDCSessions, sp_promotions: ServiceProviderPromotions, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, hnt_bone_price: Decimal, ) -> anyhow::Result<()> { @@ -651,7 +651,7 @@ pub async fn reward_service_providers( // write out any unallocated service provider reward write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::ServiceProvider, unallocated_sp_rewards, reward_info, diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 579360918..a958f9c2c 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -25,10 +25,7 @@ use rust_decimal::{prelude::ToPrimitive, Decimal}; use rust_decimal_macros::dec; use sqlx::PgPool; use std::{collections::HashMap, str::FromStr, sync::Arc}; -use tokio::{ - sync::{mpsc::error::TryRecvError, RwLock}, - time::{timeout, Timeout}, -}; +use tokio::{sync::RwLock, time::Timeout}; use tonic::async_trait; pub const EPOCH_ADDRESS: &str = "112E7TxoNHV46M6tiPA8N1MkeMeQxc9ztb4JQLXBVAAUfq1kJLoF"; @@ -86,29 +83,6 @@ pub struct MockFileSinkReceiver { pub receiver: tokio::sync::mpsc::Receiver>, } -impl MockFileSinkReceiver { - pub async fn receive(&mut self, caller: &str) -> Option { - match timeout(seconds(2), self.receiver.recv()).await { - Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { - let _ = on_write_tx.send(Ok(())); - Some(msg) - } - Ok(None) => None, - Err(e) => panic!("{caller}: timeout while waiting for message1 {:?}", e), - Ok(Some(unexpected_msg)) => { - println!("{caller}: ignoring unexpected msg {:?}", unexpected_msg); - None - } - } - } - - pub fn assert_no_messages(&mut self) { - let Err(TryRecvError::Empty) = self.receiver.try_recv() else { - panic!("receiver should have been empty") - }; - } -} - impl MockFileSinkReceiver { pub async fn get_all_speedtest_avgs(&mut self) -> Vec { let mut messages = vec![]; @@ -120,38 +94,6 @@ impl MockFileSinkReceiver { } } -impl MockFileSinkReceiver { - pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward { - match self.receive("receive_service_provider_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::ServiceProviderReward(r)) => r, - _ => panic!("failed to get service provider reward"), - }, - None => panic!("failed to receive service provider reward"), - } - } - - pub async fn receive_promotion_reward(&mut self) -> PromotionReward { - match self.receive("receive_promotion_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::PromotionReward(r)) => r, - _ => panic!("failed to get promotion reward"), - }, - None => panic!("failed to receive promotion reward"), - } - } - - pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { - match self.receive("receive_unallocated_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::UnallocatedReward(r)) => r, - _ => panic!("failed to get unallocated reward"), - }, - None => panic!("failed to receive unallocated reward"), - } - } -} - pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(20); ( @@ -226,10 +168,6 @@ impl RadioRewardV2Ext for RadioRewardV2 { } } -pub fn seconds(s: u64) -> std::time::Duration { - std::time::Duration::from_secs(s) -} - pub fn mock_hex_boost_data_default( ) -> HexBoostData { HexBoostData::builder() diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 38ca0fa51..df90d6097 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -4,17 +4,14 @@ use std::string::ToString; use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use helium_proto::{ - service_provider_promotions::Promotion, - services::poc_mobile::{ - MobileRewardShare, ServiceProviderReward, UnallocatedReward, UnallocatedRewardType, - }, + service_provider_promotions::Promotion, services::poc_mobile::UnallocatedRewardType, ServiceProvider, ServiceProviderPromotions, }; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; +use crate::common::{self, reward_info_24_hours, AsStringKeyedMap}; use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; use mobile_verifier::{data_session, reward_shares, rewarder, service_provider}; @@ -73,7 +70,7 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { let mut valid_sps = HashMap::::new(); valid_sps.insert(PAYER_1.to_string(), SP_1.to_string()); let carrier_client = MockCarrierServiceClient::new(valid_sps); - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -89,45 +86,38 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { .list_incentive_promotions(&reward_info.epoch_period.start) .await?; - let (_, rewards) = tokio::join!( - rewarder::reward_service_providers( - dc_sessions, - sp_promotions.into(), - &mobile_rewards_client, - &reward_info, - dec!(0.0001), - ), - receive_expected_rewards(&mut mobile_rewards) + rewarder::reward_service_providers( + dc_sessions, + sp_promotions.into(), + mobile_rewards_client, + &reward_info, + dec!(0.0001), + ) + .await?; + + let rewards = mobile_rewards.finish().await?; + + let sp_reward = rewards.sp_reward.first().expect("sp 1 reward"); + assert_eq!(5_999, sp_reward.amount); + + let unallocated_reward = rewards.unallocated.first().expect("unallocated"); + assert_eq!( + UnallocatedRewardType::ServiceProvider as i32, + unallocated_reward.reward_type ); - if let Ok((sp_reward, unallocated_reward)) = rewards { - assert_eq!( - SP_1.to_string(), - ServiceProvider::try_from(sp_reward.service_provider_id) - .unwrap() - .to_string() - ); - assert_eq!(5_999, sp_reward.amount); - - assert_eq!( - UnallocatedRewardType::ServiceProvider as i32, - unallocated_reward.reward_type - ); - assert_eq!(8_219_178_076_192, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_service_providers(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, sp_reward.amount + unallocated_reward.amount); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.1)); - } else { - panic!("no rewards received"); - } + assert_eq!(8_219_178_076_192, unallocated_reward.amount); + + // confirm the total rewards allocated matches expectations + let expected_sum = + reward_shares::get_scheduled_tokens_for_service_providers(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!(expected_sum, sp_reward.amount + unallocated_reward.amount); + + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.1)); Ok(()) } @@ -189,7 +179,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result let reward_info = reward_info_24_hours(); - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); let mut txn = pool.begin().await?; seed_hotspot_data(reward_info.epoch_period.end, &mut txn).await?; // DC transferred == 6,000 reward amount @@ -203,50 +193,38 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result .list_incentive_promotions(&reward_info.epoch_period.start) .await?; - let (_, rewards) = tokio::join!( - rewarder::reward_service_providers( - dc_sessions, - sp_promotions.into(), - &mobile_rewards_client, - &reward_info, - dec!(0.00001) - ), - async move { - let mut promos = vec![ - mobile_rewards.receive_promotion_reward().await, - mobile_rewards.receive_promotion_reward().await, - mobile_rewards.receive_promotion_reward().await, - ]; - // sort by awarded amount least -> most - promos.sort_by_key(|a| a.service_provider_amount); - - let sp_reward = mobile_rewards.receive_service_provider_reward().await; - let unallocated = mobile_rewards.receive_unallocated_reward().await; - - mobile_rewards.assert_no_messages(); - - (promos, sp_reward, unallocated) - } - ); + rewarder::reward_service_providers( + dc_sessions, + sp_promotions.into(), + mobile_rewards_client, + &reward_info, + dec!(0.00001), + ) + .await?; + + let rewards = mobile_rewards.finish().await?; - let (promos, sp_reward, unallocated) = rewards; - let promo_reward_1 = promos[0].clone(); - let promo_reward_2 = promos[1].clone(); - let promo_reward_3 = promos[2].clone(); + let promo_rewards = rewards + .promotion_reward + .as_keyed_map(|reward| reward.entity.to_owned()); // 1 share + let promo_reward_1 = promo_rewards.get("one").expect("promo 1"); assert_eq!(promo_reward_1.service_provider_amount, 1_499); assert_eq!(promo_reward_1.matched_amount, 1_499); // 2 shares + let promo_reward_2 = promo_rewards.get("two").expect("promo 2"); assert_eq!(promo_reward_2.service_provider_amount, 2999); assert_eq!(promo_reward_2.matched_amount, 2999); // 3 shares + let promo_reward_3 = promo_rewards.get("three").expect("promo 3"); assert_eq!(promo_reward_3.service_provider_amount, 4_499); assert_eq!(promo_reward_3.matched_amount, 4_499); // dc_percentage * total_sp_allocation rounded down + let sp_reward = rewards.sp_reward.first().expect("sp 1 reward"); assert_eq!(sp_reward.amount, 50_999); let unallocated_sp_rewards = get_unallocated_sp_rewards(reward_info.epoch_emissions); @@ -256,29 +234,12 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result - 8_998 // matched promotion + 2; // rounding + let unallocated = rewards.unallocated.first().expect("unallocated"); assert_eq!(unallocated.amount, expected_unallocated); Ok(()) } -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(ServiceProviderReward, UnallocatedReward)> { - // get the filestore outputs from rewards run - // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios - let sp_reward1 = mobile_rewards.receive_service_provider_reward().await; - // let sp_reward2 = mobile_rewards.receive_service_provider_reward().await; - // dump the sp rewards into a vec and sort to get a deteminstic order - - // expect one unallocated reward - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; - - // should be no further msgs - mobile_rewards.assert_no_messages(); - - Ok((sp_reward1, unallocated_reward)) -} - async fn seed_hotspot_data( ts: DateTime, txn: &mut Transaction<'_, Postgres>, From 81bb592680491541b1a7db1148f7f01afe115b17 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 15:28:41 -0700 Subject: [PATCH 06/12] Non blocking receiver for speedtests add wait until written for speedtest avgs --- mobile_verifier/src/speedtests_average.rs | 7 ++++-- .../tests/integrations/common/mod.rs | 20 ++++++++++++--- .../tests/integrations/speedtests.rs | 25 +++++++++++-------- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index bd54f708b..06216b331 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -92,7 +92,7 @@ impl SpeedtestAverage { pub async fn write( &self, filesink: &FileSinkClient, - ) -> file_store::Result { + ) -> anyhow::Result<()> { filesink .write( proto::SpeedtestAvg { @@ -116,7 +116,10 @@ impl SpeedtestAverage { }, &[("validity", self.validity.as_str_name())], ) - .await?; + .await? + // wait to be written + .await??; + Ok(()) } diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index a958f9c2c..88d527ba3 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -371,7 +371,7 @@ impl TestTimeoutExt for F { impl NonBlockingFileSinkReceiver { pub async fn finish(self) -> anyhow::Result { - // make sure channel is closed and done be written to + // make sure channel is closed and done being written to if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { panic!("file sink receiver channel was never closed: {err:?}"); } @@ -392,6 +392,20 @@ impl NonBlockingFileSinkReceiver { } } +impl NonBlockingFileSinkReceiver { + pub async fn finish(self) -> anyhow::Result> { + // make sure the channel is closed and done being written to + if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { + panic!("file sink receiver channel was never closed: {err:?}"); + } + + let lock = Arc::try_unwrap(self.msgs).expect("no locks on messages"); + let msgs = lock.into_inner(); + + Ok(msgs) + } +} + impl NonBlockingFileSinkReceiver { fn new(mut receiver: tokio::sync::mpsc::Receiver>) -> Self { let channel_closed = Arc::new(tokio::sync::Notify::new()); @@ -404,10 +418,10 @@ impl NonBlockingFileSinkReceiver { while let Some(msg) = receiver.recv().await { match msg { SinkMessage::Data(sender, msg) => { - sender.send(Ok(())).unwrap(); + sender.send(Ok(())).expect("ack file data"); inner_msgs.write().await.push(msg); } - SinkMessage::Commit(_sender) => todo!(), + SinkMessage::Commit(_sender) => (), SinkMessage::Rollback(_sender) => todo!(), } } diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 5bdc961eb..40e0168ca 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -54,17 +54,9 @@ async fn speedtests_average_should_only_include_last_48_hours( ) -> anyhow::Result<()> { let (_tx, rx) = tokio::sync::mpsc::channel(2); let gateway_info_resolver = MockGatewayInfoResolver {}; - let (speedtest_avg_client, mut speedtest_avg_receiver) = common::create_file_sink(); + let (speedtest_avg_client, speedtest_avg_receiver) = common::create_nonblocking_file_sink(); let (verified_client, _verified_receiver) = common::create_file_sink(); - let daemon = SpeedtestDaemon::new( - pool, - gateway_info_resolver, - rx, - speedtest_avg_client, - verified_client, - ); - let hotspot: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; @@ -77,9 +69,20 @@ async fn speedtests_average_should_only_include_last_48_hours( speedtest(&hotspot, "2024-01-06 01:00:00", 10, 100, 10), ]); - assert!(daemon.process_file(stream).await.is_ok()); + // Drop the daemon when it's done running + { + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + daemon.process_file(stream).await?; + } - let avgs = speedtest_avg_receiver.get_all_speedtest_avgs().await; + let avgs = speedtest_avg_receiver.finish().await?; assert_eq!(6, avgs.len()); assert_eq!(SpeedtestAvgValidity::TooFewSamples, avgs[0].validity()); From 25c2339f951e4d4a67f04bba7148d76381164497 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 15:30:26 -0700 Subject: [PATCH 07/12] All tests uses nonblocking receiver --- .../tests/integrations/common/mod.rs | 26 ------------------- .../tests/integrations/hex_boosting.rs | 14 +++++----- .../tests/integrations/rewarder_poc_dc.rs | 12 ++++----- .../tests/integrations/speedtests.rs | 2 +- 4 files changed, 14 insertions(+), 40 deletions(-) diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 88d527ba3..ff7ee7cf2 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -79,32 +79,6 @@ impl SubDaoEpochRewardInfoResolver for MockSubDaoRewardsClient { } } -pub struct MockFileSinkReceiver { - pub receiver: tokio::sync::mpsc::Receiver>, -} - -impl MockFileSinkReceiver { - pub async fn get_all_speedtest_avgs(&mut self) -> Vec { - let mut messages = vec![]; - while let Ok(SinkMessage::Data(on_write_tx, msg)) = self.receiver.try_recv() { - let _ = on_write_tx.send(Ok(())); - messages.push(msg); - } - messages - } -} - -pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { - let (tx, rx) = tokio::sync::mpsc::channel(20); - ( - FileSinkClient { - sender: tx, - metric: "metric".into(), - }, - MockFileSinkReceiver { receiver: rx }, - ) -} - pub trait SubscriberRewardExt { fn subscriber_id_string(&self) -> String; } diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 8057cd420..e75ad240d 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -52,7 +52,7 @@ async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -212,7 +212,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu // thresholds for the radios have not been met // the end result is that no boosting takes place, the radios are awarded non boosted reward values let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; let boost_period_length = Duration::days(30); @@ -334,7 +334,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu #[sqlx::test] async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -514,7 +514,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res #[sqlx::test] async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -614,7 +614,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -754,7 +754,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( pool: PgPool, ) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -895,7 +895,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( #[sqlx::test] async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index 73e975f16..c6888325d 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -44,7 +44,7 @@ const PAYER_1: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; #[sqlx::test] async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -117,7 +117,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -190,7 +190,7 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -226,7 +226,7 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { // ============================================================== let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); // SP ban radio, zeroed rewards are filtered out let mut txn = pool.begin().await?; @@ -253,7 +253,7 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); @@ -309,7 +309,7 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); let reward_info = reward_info_24_hours(); diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 40e0168ca..aef5c2b7b 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -55,7 +55,7 @@ async fn speedtests_average_should_only_include_last_48_hours( let (_tx, rx) = tokio::sync::mpsc::channel(2); let gateway_info_resolver = MockGatewayInfoResolver {}; let (speedtest_avg_client, speedtest_avg_receiver) = common::create_nonblocking_file_sink(); - let (verified_client, _verified_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_nonblocking_file_sink(); let hotspot: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; From 0ae3c4c0149d242e4a2e5ce6d5da36dda664862f Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 15:32:16 -0700 Subject: [PATCH 08/12] create_nonblocking_file_sink -> create_file_sink now that all are non blocking --- .../tests/integrations/common/mod.rs | 13 ++++----- .../tests/integrations/hex_boosting.rs | 28 +++++++++---------- .../tests/integrations/rewarder_mappers.rs | 2 +- .../tests/integrations/rewarder_oracles.rs | 2 +- .../tests/integrations/rewarder_poc_dc.rs | 24 ++++++++-------- .../tests/integrations/rewarder_sp_rewards.rs | 4 +-- .../tests/integrations/speedtests.rs | 4 +-- 7 files changed, 38 insertions(+), 39 deletions(-) diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index ff7ee7cf2..108619459 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -264,20 +264,19 @@ pub fn default_price_info() -> PriceInfo { // Non-blocking version is file sink testing. // Requires the FileSinkClient to be dropped when all writing is done, or panic!. -pub fn create_nonblocking_file_sink( -) -> (FileSinkClient, NonBlockingFileSinkReceiver) { +pub fn create_file_sink() -> (FileSinkClient, FileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(999); ( FileSinkClient { sender: tx, metric: "metric".into(), }, - NonBlockingFileSinkReceiver::new(rx), + FileSinkReceiver::new(rx), ) } #[derive(Debug)] -pub struct NonBlockingFileSinkReceiver { +pub struct FileSinkReceiver { msgs: Arc>>, channel_closed: Arc, } @@ -343,7 +342,7 @@ impl TestTimeoutExt for F { } } -impl NonBlockingFileSinkReceiver { +impl FileSinkReceiver { pub async fn finish(self) -> anyhow::Result { // make sure channel is closed and done being written to if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { @@ -366,7 +365,7 @@ impl NonBlockingFileSinkReceiver { } } -impl NonBlockingFileSinkReceiver { +impl FileSinkReceiver { pub async fn finish(self) -> anyhow::Result> { // make sure the channel is closed and done being written to if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { @@ -380,7 +379,7 @@ impl NonBlockingFileSinkReceiver { } } -impl NonBlockingFileSinkReceiver { +impl FileSinkReceiver { fn new(mut receiver: tokio::sync::mpsc::Receiver>) -> Self { let channel_closed = Arc::new(tokio::sync::Notify::new()); let closer = channel_closed.clone(); diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index e75ad240d..ff3fc599e 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -51,8 +51,8 @@ async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -211,8 +211,8 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu // this simulates the case where we have radios in boosted hexes but where the coverage // thresholds for the radios have not been met // the end result is that no boosting takes place, the radios are awarded non boosted reward values - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; let boost_period_length = Duration::days(30); @@ -333,8 +333,8 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu #[sqlx::test] async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -513,8 +513,8 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res #[sqlx::test] async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -613,8 +613,8 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -753,8 +753,8 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( pool: PgPool, ) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); let boost_period_length = Duration::days(30); @@ -894,8 +894,8 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( #[sqlx::test] async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 49d89e692..456d74185 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -22,7 +22,7 @@ const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; #[sqlx::test] async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); // seed db diff --git a/mobile_verifier/tests/integrations/rewarder_oracles.rs b/mobile_verifier/tests/integrations/rewarder_oracles.rs index d91bb6a34..4d97e7217 100644 --- a/mobile_verifier/tests/integrations/rewarder_oracles.rs +++ b/mobile_verifier/tests/integrations/rewarder_oracles.rs @@ -7,7 +7,7 @@ use sqlx::PgPool; #[sqlx::test] async fn test_oracle_rewards(_pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index c6888325d..43854d1b0 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -43,8 +43,8 @@ const PAYER_1: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; #[sqlx::test] async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -116,8 +116,8 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -189,8 +189,8 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -225,8 +225,8 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { assert_eq!(msgs.radio_reward_v2.len(), 3); // ============================================================== - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); // SP ban radio, zeroed rewards are filtered out let mut txn = pool.begin().await?; @@ -252,8 +252,8 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -308,8 +308,8 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); - let (speedtest_avg_client, _speedtest_avg_server) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index df90d6097..178afacb3 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -70,7 +70,7 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { let mut valid_sps = HashMap::::new(); valid_sps.insert(PAYER_1.to_string(), SP_1.to_string()); let carrier_client = MockCarrierServiceClient::new(valid_sps); - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -179,7 +179,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result let reward_info = reward_info_24_hours(); - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let mut txn = pool.begin().await?; seed_hotspot_data(reward_info.epoch_period.end, &mut txn).await?; // DC transferred == 6,000 reward amount diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index aef5c2b7b..f5bb726ef 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -54,8 +54,8 @@ async fn speedtests_average_should_only_include_last_48_hours( ) -> anyhow::Result<()> { let (_tx, rx) = tokio::sync::mpsc::channel(2); let gateway_info_resolver = MockGatewayInfoResolver {}; - let (speedtest_avg_client, speedtest_avg_receiver) = common::create_nonblocking_file_sink(); - let (verified_client, _verified_receiver) = common::create_nonblocking_file_sink(); + let (speedtest_avg_client, speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); let hotspot: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; From b44994333b4081ffbcb2ba3a68a721f1e6c75d86 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 3 Apr 2025 15:36:44 -0700 Subject: [PATCH 09/12] rename message fields in the plural unallocated is kept as a vec even though there should only be one out of convenience of implementation. And in the case where a rewarder can output multiple types of unallocated for a single message type. --- .../tests/integrations/common/mod.rs | 28 +++++++++---------- .../tests/integrations/hex_boosting.rs | 14 +++++----- .../tests/integrations/rewarder_mappers.rs | 2 +- .../tests/integrations/rewarder_poc_dc.rs | 24 ++++++++-------- .../tests/integrations/rewarder_sp_rewards.rs | 6 ++-- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 108619459..93d3aecbd 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -283,25 +283,25 @@ pub struct FileSinkReceiver { #[derive(Default, Debug)] pub struct MobileRewardShareMessages { - pub gateway_reward: Vec, - pub radio_reward: Vec, - pub subscriber_reward: Vec, - pub sp_reward: Vec, + pub gateway_rewards: Vec, + pub radio_rewards: Vec, + pub subscriber_rewards: Vec, + pub sp_rewards: Vec, pub unallocated: Vec, - pub radio_reward_v2: Vec, - pub promotion_reward: Vec, + pub radio_reward_v2s: Vec, + pub promotion_rewards: Vec, } impl MobileRewardShareMessages { fn insert(&mut self, item: MobileReward) { match item { - MobileReward::GatewayReward(inner) => self.gateway_reward.push(inner), - MobileReward::RadioReward(inner) => self.radio_reward.push(inner), - MobileReward::SubscriberReward(inner) => self.subscriber_reward.push(inner), - MobileReward::ServiceProviderReward(inner) => self.sp_reward.push(inner), + MobileReward::GatewayReward(inner) => self.gateway_rewards.push(inner), + MobileReward::RadioReward(inner) => self.radio_rewards.push(inner), + MobileReward::SubscriberReward(inner) => self.subscriber_rewards.push(inner), + MobileReward::ServiceProviderReward(inner) => self.sp_rewards.push(inner), MobileReward::UnallocatedReward(inner) => self.unallocated.push(inner), - MobileReward::RadioRewardV2(inner) => self.radio_reward_v2.push(inner), - MobileReward::PromotionReward(inner) => self.promotion_reward.push(inner), + MobileReward::RadioRewardV2(inner) => self.radio_reward_v2s.push(inner), + MobileReward::PromotionReward(inner) => self.promotion_rewards.push(inner), } } @@ -313,14 +313,14 @@ impl MobileRewardShareMessages { } pub fn total_poc_rewards(&self) -> u64 { - self.radio_reward_v2 + self.radio_reward_v2s .iter() .map(|reward| reward.total_poc_reward()) .sum() } pub fn total_sub_discovery_amount(&self) -> u64 { - self.subscriber_reward + self.subscriber_rewards .iter() .map(|reward| reward.discovery_location_amount) .sum() diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index ff3fc599e..7aad8cbc7 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -148,7 +148,7 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); @@ -308,7 +308,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); @@ -441,7 +441,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); @@ -584,7 +584,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); @@ -690,7 +690,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location NO boosts @@ -833,7 +833,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location trust NO boosts @@ -999,7 +999,7 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an let rewards = mobile_rewards.finish().await?; let poc_rewards = rewards - .radio_reward_v2 + .radio_reward_v2s .as_keyed_map(|v| v.hotspot_key_string()); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // 2 boosts at 10x let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // 1 boost at 20x diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 456d74185..5b97519b1 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -34,7 +34,7 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; let subscriber_rewards = rewards - .subscriber_reward + .subscriber_rewards .as_keyed_map(|reward| reward.subscriber_id_string()); // assert the mapper rewards diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index 43854d1b0..10491059f 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -71,8 +71,8 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { .await?; let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; - let dc_rewards = rewards.gateway_reward; + let poc_rewards = rewards.radio_reward_v2s; + let dc_rewards = rewards.gateway_rewards; let unallocated_reward = rewards.unallocated.first(); let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); @@ -159,8 +159,8 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { .await?; let msgs = mobile_rewards.finish().await?; - let poc_rewards = msgs.radio_reward_v2; - let dc_rewards = msgs.gateway_reward; + let poc_rewards = msgs.radio_reward_v2s; + let dc_rewards = msgs.gateway_rewards; // expecting single radio with poc rewards, no unallocated assert_eq!(poc_rewards.len(), 1); @@ -221,8 +221,8 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { .await?; let msgs = mobile_rewards.finish().await?; - assert_eq!(msgs.gateway_reward.len(), 3); - assert_eq!(msgs.radio_reward_v2.len(), 3); + assert_eq!(msgs.gateway_rewards.len(), 3); + assert_eq!(msgs.radio_reward_v2s.len(), 3); // ============================================================== let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); @@ -244,8 +244,8 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { .await?; let msgs = mobile_rewards.finish().await?; - assert_eq!(msgs.gateway_reward.len(), 3); - assert_eq!(msgs.radio_reward_v2.len(), 2); + assert_eq!(msgs.gateway_rewards.len(), 3); + assert_eq!(msgs.radio_reward_v2s.len(), 2); Ok(()) } @@ -294,9 +294,9 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { .await?; let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; + let poc_rewards = rewards.radio_reward_v2s; - let dc_rewards = rewards.gateway_reward; + let dc_rewards = rewards.gateway_rewards; // expecting single radio with poc rewards, no unallocated assert_eq!(poc_rewards.len(), 2); @@ -349,8 +349,8 @@ async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Resu .await?; let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; - let dc_rewards = rewards.gateway_reward; + let poc_rewards = rewards.radio_reward_v2s; + let dc_rewards = rewards.gateway_rewards; assert_eq!(poc_rewards.len(), 3); assert_eq!(dc_rewards.len(), 0); diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 178afacb3..7608b62d9 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -97,7 +97,7 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; - let sp_reward = rewards.sp_reward.first().expect("sp 1 reward"); + let sp_reward = rewards.sp_rewards.first().expect("sp 1 reward"); assert_eq!(5_999, sp_reward.amount); let unallocated_reward = rewards.unallocated.first().expect("unallocated"); @@ -205,7 +205,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result let rewards = mobile_rewards.finish().await?; let promo_rewards = rewards - .promotion_reward + .promotion_rewards .as_keyed_map(|reward| reward.entity.to_owned()); // 1 share @@ -224,7 +224,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result assert_eq!(promo_reward_3.matched_amount, 4_499); // dc_percentage * total_sp_allocation rounded down - let sp_reward = rewards.sp_reward.first().expect("sp 1 reward"); + let sp_reward = rewards.sp_rewards.first().expect("sp 1 reward"); assert_eq!(sp_reward.amount, 50_999); let unallocated_sp_rewards = get_unallocated_sp_rewards(reward_info.epoch_emissions); From 164051dc4164cf25e1cafbef69667191f8602988 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Mon, 7 Apr 2025 10:12:21 -0700 Subject: [PATCH 10/12] Await writing speedtests output --- mobile_verifier/src/speedtests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 46fc690cf..0cf68c112 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -205,7 +205,8 @@ where }; self.verified_speedtest_file_sink .write(proto, &[("result", result.as_str_name())]) - .await?; + .await? + .await??; Ok(()) } } From 6ca03ee7f0042543a2525d2703b741395718fad0 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 9 Apr 2025 13:37:34 -0700 Subject: [PATCH 11/12] remove str::self import --- mobile_verifier/tests/integrations/rewarder_mappers.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 5b97519b1..e192272a2 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -10,10 +10,7 @@ use mobile_verifier::{ use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; -use std::{ - str::{self, FromStr}, - string::ToString, -}; +use std::{str::FromStr, string::ToString}; const SUBSCRIBER_1: &str = "subscriber1"; const SUBSCRIBER_2: &str = "subscriber2"; From e53ec7668bd19cac8151bfe11859bf894c3f459e Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Fri, 11 Apr 2025 13:14:01 -0700 Subject: [PATCH 12/12] Auto implement AsStringKeyedMap for any type that can provide a key with AsStringKeyedMapKey --- .../tests/integrations/common/mod.rs | 48 +++++++++++-------- .../tests/integrations/hex_boosting.rs | 28 +++-------- .../tests/integrations/rewarder_mappers.rs | 6 +-- .../tests/integrations/rewarder_sp_rewards.rs | 5 +- .../tests/integrations/speedtests.rs | 2 +- 5 files changed, 38 insertions(+), 51 deletions(-) diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 93d3aecbd..200da75c0 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -79,19 +79,7 @@ impl SubDaoEpochRewardInfoResolver for MockSubDaoRewardsClient { } } -pub trait SubscriberRewardExt { - fn subscriber_id_string(&self) -> String; -} - -impl SubscriberRewardExt for SubscriberReward { - fn subscriber_id_string(&self) -> String { - use helium_proto::Message; - String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id") - } -} - pub trait RadioRewardV2Ext { - fn hotspot_key_string(&self) -> String; fn boosted_hexes(&self) -> Vec; fn nth_boosted_hex(&self, index: usize) -> radio_reward_v2::CoveredHex; fn boosted_hexes_len(&self) -> usize; @@ -100,10 +88,6 @@ pub trait RadioRewardV2Ext { } impl RadioRewardV2Ext for RadioRewardV2 { - fn hotspot_key_string(&self) -> String { - PublicKeyBinary::from(self.hotspot_key.to_vec()).to_string() - } - fn boosted_hexes(&self) -> Vec { self.covered_hexes.to_vec() } @@ -327,7 +311,6 @@ impl MobileRewardShareMessages { } } -#[async_trait::async_trait] trait TestTimeoutExt where Self: Sized, @@ -413,19 +396,42 @@ impl FileSinkReceiver { // This trait assumes there will not be multiple entries // in the Vec for a given String. pub trait AsStringKeyedMap { - fn as_keyed_map(&self, key_func: impl Fn(&V) -> String) -> HashMap + fn as_keyed_map(&self) -> HashMap where Self: Sized; } -impl AsStringKeyedMap for Vec { - fn as_keyed_map(&self, key_func: impl Fn(&V) -> String) -> HashMap +pub trait AsStringKeyedMapKey { + fn key(&self) -> String; +} + +impl AsStringKeyedMapKey for RadioRewardV2 { + fn key(&self) -> String { + PublicKeyBinary::from(self.hotspot_key.to_vec()).to_string() + } +} + +impl AsStringKeyedMapKey for SubscriberReward { + fn key(&self) -> String { + use helium_proto::Message; + String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id") + } +} + +impl AsStringKeyedMapKey for PromotionReward { + fn key(&self) -> String { + self.entity.to_owned() + } +} + +impl AsStringKeyedMap for Vec { + fn as_keyed_map(&self) -> HashMap where Self: Sized, { let mut map = HashMap::new(); for item in self { - let key = key_func(item); + let key = item.key(); if map.contains_key(&key) { panic!("Duplicate string key found: {}", key); } diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 7aad8cbc7..bd646250f 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -147,9 +147,7 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); @@ -307,9 +305,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); @@ -440,9 +436,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); @@ -583,9 +577,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); @@ -689,9 +681,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location NO boosts let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost @@ -832,9 +822,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location trust NO boosts let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost @@ -998,9 +986,7 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards - .radio_reward_v2s - .as_keyed_map(|v| v.hotspot_key_string()); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // 2 boosts at 10x let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // 1 boost at 20x let hotspot_4 = poc_rewards.get(HOTSPOT_4).expect("hotspot 4"); // no boosts diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index e192272a2..8507ec5dc 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -1,4 +1,4 @@ -use crate::common::{self, reward_info_24_hours, AsStringKeyedMap, SubscriberRewardExt}; +use crate::common::{self, reward_info_24_hours, AsStringKeyedMap}; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; @@ -30,9 +30,7 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_mappers(&pool, mobile_rewards_client, &reward_info).await?; let rewards = mobile_rewards.finish().await?; - let subscriber_rewards = rewards - .subscriber_rewards - .as_keyed_map(|reward| reward.subscriber_id_string()); + let subscriber_rewards = rewards.subscriber_rewards.as_keyed_map(); // assert the mapper rewards // all 3 subscribers will have an equal share, diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 7608b62d9..29ea857d0 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -203,10 +203,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result .await?; let rewards = mobile_rewards.finish().await?; - - let promo_rewards = rewards - .promotion_rewards - .as_keyed_map(|reward| reward.entity.to_owned()); + let promo_rewards = rewards.promotion_rewards.as_keyed_map(); // 1 share let promo_reward_1 = promo_rewards.get("one").expect("promo 1"); diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index f5bb726ef..8e701e0d0 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -69,7 +69,7 @@ async fn speedtests_average_should_only_include_last_48_hours( speedtest(&hotspot, "2024-01-06 01:00:00", 10, 100, 10), ]); - // Drop the daemon when it's done running + // Drop the daemon when it's done running to close the channel { let daemon = SpeedtestDaemon::new( pool,