diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8be1d3076..6520a9964 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -286,7 +286,7 @@ where let poc_dc_shares = reward_poc_and_dc( &self.pool, &self.hex_service_client, - &self.mobile_rewards, + self.mobile_rewards.clone(), &self.speedtest_averages, &reward_info, price_info.clone(), @@ -294,7 +294,7 @@ where .await?; // process rewards for mappers - reward_mappers(&self.pool, &self.mobile_rewards, &reward_info).await?; + reward_mappers(&self.pool, self.mobile_rewards.clone(), &reward_info).await?; // process rewards for service providers let dc_sessions = service_provider::get_dc_sessions( @@ -309,14 +309,14 @@ where reward_service_providers( dc_sessions, sp_promotions.clone(), - &self.mobile_rewards, + self.mobile_rewards.clone(), &reward_info, price_info.price_per_bone, ) .await?; // process rewards for oracles - reward_oracles(&self.mobile_rewards, &reward_info).await?; + reward_oracles(self.mobile_rewards.clone(), &reward_info).await?; self.speedtest_averages.commit().await?; let written_files = self.mobile_rewards.commit().await?.await??; @@ -394,7 +394,7 @@ where pub async fn reward_poc_and_dc( pool: &Pool, hex_service_client: &impl HexBoostingInfoResolver, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, speedtest_avg_sink: &FileSinkClient, reward_info: &EpochRewardInfo, price_info: PriceInfo, @@ -420,7 +420,7 @@ pub async fn reward_poc_and_dc( // reward dc before poc so that we can calculate the unallocated dc reward // and carry this into the poc pool let dc_unallocated_amount = reward_dc( - mobile_rewards, + &mobile_rewards, reward_info, transfer_rewards, &reward_shares, @@ -431,7 +431,7 @@ pub async fn reward_poc_and_dc( let (poc_unallocated_amount, calculated_poc_reward_shares) = reward_poc( pool, hex_service_client, - mobile_rewards, + &mobile_rewards, speedtest_avg_sink, reward_info, reward_shares, @@ -444,7 +444,7 @@ pub async fn reward_poc_and_dc( .unwrap_or(0); write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Poc, poc_unallocated_amount, reward_info, @@ -552,7 +552,7 @@ pub async fn reward_dc( pub async fn reward_mappers( pool: &Pool, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { let rewardable_mapping_activity = subscriber_mapping_activity::db::rewardable_mapping_activity( @@ -587,7 +587,7 @@ pub async fn reward_mappers( .unwrap_or(0) - allocated_mapping_rewards; write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Mapper, unallocated_mapping_reward_amount, reward_info, @@ -598,7 +598,7 @@ pub async fn reward_mappers( } pub async fn reward_oracles( - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { // atm 100% of oracle rewards are assigned to 'unallocated' @@ -611,7 +611,7 @@ pub async fn reward_oracles( .unwrap_or(0) - allocated_oracle_rewards; write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::Oracle, unallocated_oracle_reward_amount, reward_info, @@ -623,7 +623,7 @@ pub async fn reward_oracles( pub async fn reward_service_providers( dc_sessions: ServiceProviderDCSessions, sp_promotions: ServiceProviderPromotions, - mobile_rewards: &FileSinkClient, + mobile_rewards: FileSinkClient, reward_info: &EpochRewardInfo, hnt_bone_price: Decimal, ) -> anyhow::Result<()> { @@ -651,7 +651,7 @@ pub async fn reward_service_providers( // write out any unallocated service provider reward write_unallocated_reward( - mobile_rewards, + &mobile_rewards, UnallocatedRewardType::ServiceProvider, unallocated_sp_rewards, reward_info, diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 46fc690cf..0cf68c112 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -205,7 +205,8 @@ where }; self.verified_speedtest_file_sink .write(proto, &[("result", result.as_str_name())]) - .await?; + .await? + .await??; Ok(()) } } diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index bd54f708b..06216b331 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -92,7 +92,7 @@ impl SpeedtestAverage { pub async fn write( &self, filesink: &FileSinkClient, - ) -> file_store::Result { + ) -> anyhow::Result<()> { filesink .write( proto::SpeedtestAvg { @@ -116,7 +116,10 @@ impl SpeedtestAverage { }, &[("validity", self.validity.as_str_name())], ) - .await?; + .await? + // wait to be written + .await??; + Ok(()) } diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 19fdc83cc..200da75c0 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Duration, Utc}; use file_store::{ file_sink::{FileSinkClient, Message as SinkMessage}, - traits::TimestampEncode, + traits::{MsgBytes, TimestampEncode}, }; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; @@ -25,10 +25,7 @@ use rust_decimal::{prelude::ToPrimitive, Decimal}; use rust_decimal_macros::dec; use sqlx::PgPool; use std::{collections::HashMap, str::FromStr, sync::Arc}; -use tokio::{ - sync::{mpsc::error::TryRecvError, RwLock}, - time::{timeout, Timeout}, -}; +use tokio::{sync::RwLock, time::Timeout}; use tonic::async_trait; pub const EPOCH_ADDRESS: &str = "112E7TxoNHV46M6tiPA8N1MkeMeQxc9ztb4JQLXBVAAUfq1kJLoF"; @@ -82,127 +79,6 @@ impl SubDaoEpochRewardInfoResolver for MockSubDaoRewardsClient { } } -pub struct MockFileSinkReceiver { - pub receiver: tokio::sync::mpsc::Receiver>, -} - -impl MockFileSinkReceiver { - pub async fn receive(&mut self, caller: &str) -> Option { - match timeout(seconds(2), self.receiver.recv()).await { - Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { - let _ = on_write_tx.send(Ok(())); - Some(msg) - } - Ok(None) => None, - Err(e) => panic!("{caller}: timeout while waiting for message1 {:?}", e), - Ok(Some(unexpected_msg)) => { - println!("{caller}: ignoring unexpected msg {:?}", unexpected_msg); - None - } - } - } - - pub fn assert_no_messages(&mut self) { - let Err(TryRecvError::Empty) = self.receiver.try_recv() else { - panic!("receiver should have been empty") - }; - } -} - -impl MockFileSinkReceiver { - pub async fn get_all_speedtest_avgs(&mut self) -> Vec { - let mut messages = vec![]; - while let Ok(SinkMessage::Data(on_write_tx, msg)) = self.receiver.try_recv() { - let _ = on_write_tx.send(Ok(())); - messages.push(msg); - } - messages - } -} - -impl MockFileSinkReceiver { - pub async fn receive_radio_reward_v1(&mut self) -> RadioReward { - match self.receive("receive_radio_reward_v1").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::RadioReward(r)) => r, - err => panic!("failed to get radio reward: {err:?}"), - }, - None => panic!("failed to receive radio reward"), - } - } - - pub async fn receive_radio_reward(&mut self) -> RadioRewardV2 { - // NOTE(mj): When v1 rewards stop being written, remove this receiver - // and the comparison. - let radio_reward_v1 = self.receive_radio_reward_v1().await; - match self.receive("receive_radio_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::RadioRewardV2(reward)) => { - assert_eq!( - reward.total_poc_reward(), - radio_reward_v1.poc_reward, - "mismatch in poc rewards between v1 and v2" - ); - reward - } - err => panic!("failed to get radio reward: {err:?}"), - }, - None => panic!("failed to receive radio reward"), - } - } - - pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward { - match self.receive("receive_service_provider_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::ServiceProviderReward(r)) => r, - _ => panic!("failed to get service provider reward"), - }, - None => panic!("failed to receive service provider reward"), - } - } - - pub async fn receive_subscriber_reward(&mut self) -> SubscriberReward { - match self.receive("receive_subscriber_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::SubscriberReward(r)) => r, - _ => panic!("failed to get subscriber reward"), - }, - None => panic!("failed to receive subscriber reward"), - } - } - - pub async fn receive_promotion_reward(&mut self) -> PromotionReward { - match self.receive("receive_promotion_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::PromotionReward(r)) => r, - _ => panic!("failed to get promotion reward"), - }, - None => panic!("failed to receive promotion reward"), - } - } - - pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { - match self.receive("receive_unallocated_reward").await { - Some(mobile_reward) => match mobile_reward.reward { - Some(MobileReward::UnallocatedReward(r)) => r, - _ => panic!("failed to get unallocated reward"), - }, - None => panic!("failed to receive unallocated reward"), - } - } -} - -pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { - let (tx, rx) = tokio::sync::mpsc::channel(20); - ( - FileSinkClient { - sender: tx, - metric: "metric".into(), - }, - MockFileSinkReceiver { receiver: rx }, - ) -} - pub trait RadioRewardV2Ext { fn boosted_hexes(&self) -> Vec; fn nth_boosted_hex(&self, index: usize) -> radio_reward_v2::CoveredHex; @@ -250,10 +126,6 @@ impl RadioRewardV2Ext for RadioRewardV2 { } } -pub fn seconds(s: u64) -> std::time::Duration { - std::time::Duration::from_secs(s) -} - pub fn mock_hex_boost_data_default( ) -> HexBoostData { HexBoostData::builder() @@ -376,50 +248,69 @@ pub fn default_price_info() -> PriceInfo { // Non-blocking version is file sink testing. // Requires the FileSinkClient to be dropped when all writing is done, or panic!. -pub fn create_nonblocking_file_sink( -) -> (FileSinkClient, NonBlockingFileSinkReceiver) { +pub fn create_file_sink() -> (FileSinkClient, FileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(999); ( FileSinkClient { sender: tx, metric: "metric".into(), }, - NonBlockingFileSinkReceiver::new(rx), + FileSinkReceiver::new(rx), ) } #[derive(Debug)] -pub struct NonBlockingFileSinkReceiver { +pub struct FileSinkReceiver { msgs: Arc>>, channel_closed: Arc, } #[derive(Default, Debug)] pub struct MobileRewardShareMessages { - pub gateway_reward: Vec, - pub radio_reward: Vec, - pub subscriber_reward: Vec, - pub sp_reward: Vec, + pub gateway_rewards: Vec, + pub radio_rewards: Vec, + pub subscriber_rewards: Vec, + pub sp_rewards: Vec, pub unallocated: Vec, - pub radio_reward_v2: Vec, - pub promotion_reward: Vec, + pub radio_reward_v2s: Vec, + pub promotion_rewards: Vec, } impl MobileRewardShareMessages { fn insert(&mut self, item: MobileReward) { match item { - MobileReward::GatewayReward(inner) => self.gateway_reward.push(inner), - MobileReward::RadioReward(inner) => self.radio_reward.push(inner), - MobileReward::SubscriberReward(inner) => self.subscriber_reward.push(inner), - MobileReward::ServiceProviderReward(inner) => self.sp_reward.push(inner), + MobileReward::GatewayReward(inner) => self.gateway_rewards.push(inner), + MobileReward::RadioReward(inner) => self.radio_rewards.push(inner), + MobileReward::SubscriberReward(inner) => self.subscriber_rewards.push(inner), + MobileReward::ServiceProviderReward(inner) => self.sp_rewards.push(inner), MobileReward::UnallocatedReward(inner) => self.unallocated.push(inner), - MobileReward::RadioRewardV2(inner) => self.radio_reward_v2.push(inner), - MobileReward::PromotionReward(inner) => self.promotion_reward.push(inner), + MobileReward::RadioRewardV2(inner) => self.radio_reward_v2s.push(inner), + MobileReward::PromotionReward(inner) => self.promotion_rewards.push(inner), } } + + pub fn unallocated_amount_or_default(&self) -> u64 { + self.unallocated + .iter() + .map(|reward| reward.amount) + .sum::() + } + + pub fn total_poc_rewards(&self) -> u64 { + self.radio_reward_v2s + .iter() + .map(|reward| reward.total_poc_reward()) + .sum() + } + + pub fn total_sub_discovery_amount(&self) -> u64 { + self.subscriber_rewards + .iter() + .map(|reward| reward.discovery_location_amount) + .sum() + } } -#[async_trait::async_trait] trait TestTimeoutExt where Self: Sized, @@ -434,9 +325,9 @@ impl TestTimeoutExt for F { } } -impl NonBlockingFileSinkReceiver { +impl FileSinkReceiver { pub async fn finish(self) -> anyhow::Result { - // make sure channel is closed and done be written to + // make sure channel is closed and done being written to if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { panic!("file sink receiver channel was never closed: {err:?}"); } @@ -457,7 +348,21 @@ impl NonBlockingFileSinkReceiver { } } -impl NonBlockingFileSinkReceiver { +impl FileSinkReceiver { + pub async fn finish(self) -> anyhow::Result> { + // make sure the channel is closed and done being written to + if let Err(err) = self.channel_closed.notified().timeout_2_secs().await { + panic!("file sink receiver channel was never closed: {err:?}"); + } + + let lock = Arc::try_unwrap(self.msgs).expect("no locks on messages"); + let msgs = lock.into_inner(); + + Ok(msgs) + } +} + +impl FileSinkReceiver { fn new(mut receiver: tokio::sync::mpsc::Receiver>) -> Self { let channel_closed = Arc::new(tokio::sync::Notify::new()); let closer = channel_closed.clone(); @@ -469,10 +374,10 @@ impl NonBlockingFileSinkReceiver { while let Some(msg) = receiver.recv().await { match msg { SinkMessage::Data(sender, msg) => { - sender.send(Ok(())).unwrap(); + sender.send(Ok(())).expect("ack file data"); inner_msgs.write().await.push(msg); } - SinkMessage::Commit(_sender) => todo!(), + SinkMessage::Commit(_sender) => (), SinkMessage::Rollback(_sender) => todo!(), } } @@ -485,3 +390,53 @@ impl NonBlockingFileSinkReceiver { } } } + +// Allows converting from a Vec to HashMap +// +// This trait assumes there will not be multiple entries +// in the Vec for a given String. +pub trait AsStringKeyedMap { + fn as_keyed_map(&self) -> HashMap + where + Self: Sized; +} + +pub trait AsStringKeyedMapKey { + fn key(&self) -> String; +} + +impl AsStringKeyedMapKey for RadioRewardV2 { + fn key(&self) -> String { + PublicKeyBinary::from(self.hotspot_key.to_vec()).to_string() + } +} + +impl AsStringKeyedMapKey for SubscriberReward { + fn key(&self) -> String { + use helium_proto::Message; + String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id") + } +} + +impl AsStringKeyedMapKey for PromotionReward { + fn key(&self) -> String { + self.entity.to_owned() + } +} + +impl AsStringKeyedMap for Vec { + fn as_keyed_map(&self) -> HashMap + where + Self: Sized, + { + let mut map = HashMap::new(); + for item in self { + let key = item.key(); + if map.contains_key(&key) { + panic!("Duplicate string key found: {}", key); + } + map.insert(key, item.clone()); + } + map + } +} diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 4aaa2ae24..bd646250f 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -1,5 +1,5 @@ use crate::common::{ - self, default_price_info, reward_info_24_hours, MockFileSinkReceiver, MockHexBoostingClient, + self, default_price_info, reward_info_24_hours, AsStringKeyedMap, MockHexBoostingClient, RadioRewardV2Ext, }; use chrono::{DateTime, Duration as ChronoDuration, Duration, Utc}; @@ -9,15 +9,11 @@ use file_store::{ unique_connections::{UniqueConnectionReq, UniqueConnectionsIngestReport}, }; use helium_crypto::PublicKeyBinary; -use helium_proto::services::{ - poc_lora::UnallocatedRewardType, - poc_mobile::{ - CoverageObjectValidity, HeartbeatValidity, LocationSource, MobileRewardShare, - RadioRewardV2, SeniorityUpdateReason, SignalLevel, UnallocatedReward, - }, +use helium_proto::services::poc_mobile::{ + CoverageObjectValidity, HeartbeatValidity, LocationSource, SeniorityUpdateReason, SignalLevel, }; use hextree::Cell; -use mobile_config::boosted_hex_info::BoostedHexInfo; +use mobile_config::{boosted_hex_info::BoostedHexInfo, sub_dao_epoch_reward_info::EpochRewardInfo}; use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, @@ -55,7 +51,7 @@ async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -137,54 +133,26 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - let price_info = default_price_info(); - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + price_info, + ) + .await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let rewards = mobile_rewards.finish().await?; - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); - let hotspot_1 = poc_rewards.next().unwrap(); - let hotspot_3 = poc_rewards.next().unwrap(); - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); + + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -227,17 +195,10 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - assert_eq!(total_poc_emissions, total); - - // confirm the rewarded percentage amount matches expectations - - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -248,7 +209,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu // this simulates the case where we have radios in boosted hexes but where the coverage // thresholds for the radios have not been met // the end result is that no boosting takes place, the radios are awarded non boosted reward values - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; @@ -331,68 +292,44 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu let reward_info = reward_info_24_hours(); let price_info = default_price_info(); - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards(&mut mobile_rewards) - ); - if let Ok((poc_rewards, unallocated_reward)) = rewards { - // assert poc reward outputs - let exp_reward_1 = 16438356164383; - let exp_reward_2 = 16438356164383; - let exp_reward_3 = 16438356164383; - - assert_eq!(exp_reward_1, poc_rewards[0].total_poc_reward()); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(poc_rewards[0].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_2, poc_rewards[1].total_poc_reward()); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(poc_rewards[1].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_3, poc_rewards[2].total_poc_reward()); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(poc_rewards[2].hotspot_key.clone()).to_string() - ); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + price_info, + ) + .await?; - // assert the number of boosted hexes for each radio - assert_eq!(0, poc_rewards[0].boosted_hexes_len()); - assert_eq!(0, poc_rewards[1].boosted_hexes_len()); - assert_eq!(0, poc_rewards[2].boosted_hexes_len()); + let rewards = mobile_rewards.finish().await?; - // confirm the total rewards allocated matches expectations - let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); - let unallocated_sum: u64 = unallocated_reward.amount; - let total = poc_sum + unallocated_sum; + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); + + assert_eq!(16_438_356_164_383, hotspot_1.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_2.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_3.total_poc_reward()); + + // assert the number of boosted hexes for each radio + assert_eq!(0, hotspot_1.boosted_hexes_len()); + assert_eq!(0, hotspot_2.boosted_hexes_len()); + assert_eq!(0, hotspot_3.boosted_hexes_len()); + + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); - } else { - panic!("no rewards received"); - }; Ok(()) } #[sqlx::test] async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -486,57 +423,23 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res }, ]; - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - - let price_info = default_price_info(); - - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let rewards = mobile_rewards.finish().await?; - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // 1 boost at 20x - let hotspot_1 = poc_rewards.next().unwrap(); // 2 boost at 10x - let hotspot_3 = poc_rewards.next().unwrap(); // no boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // Calculating expected rewards // - 2 covered hexes boosted at 10x @@ -594,23 +497,17 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res assert_eq!(0x8a1fb46622d7fff_u64, hotspot_1_boosted_hexes[1].location); assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - assert_eq!(total_poc_emissions, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } #[sqlx::test] async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -667,73 +564,48 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let price_info = default_price_info(); + let rewards = mobile_rewards.finish().await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards(&mut mobile_rewards) - ); - if let Ok((poc_rewards, unallocated_reward)) = rewards { - // assert poc reward outputs - let exp_reward_1 = 16_438_356_164_383; - let exp_reward_2 = 16_438_356_164_383; - let exp_reward_3 = 16_438_356_164_383; - - assert_eq!(exp_reward_1, poc_rewards[0].total_poc_reward()); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(poc_rewards[0].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_2, poc_rewards[1].total_poc_reward()); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(poc_rewards[1].hotspot_key.clone()).to_string() - ); - assert_eq!(exp_reward_3, poc_rewards[2].total_poc_reward()); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(poc_rewards[2].hotspot_key.clone()).to_string() - ); + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); - // assert the number of boosted hexes for each radio - // all will be zero as the boost period has expired for the single boosted hex - assert_eq!(0, poc_rewards[0].boosted_hexes_len()); - assert_eq!(0, poc_rewards[1].boosted_hexes_len()); - assert_eq!(0, poc_rewards[2].boosted_hexes_len()); + assert_eq!(poc_rewards.len(), 3); - // confirm the total rewards allocated matches expectations - let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); - let unallocated_sum: u64 = unallocated_reward.amount; - let total = poc_sum + unallocated_sum; + // assert poc reward outputs + assert_eq!(16_438_356_164_383, hotspot_1.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_2.total_poc_reward()); + assert_eq!(16_438_356_164_383, hotspot_3.total_poc_reward()); + + // assert the number of boosted hexes for each radio + // all will be zero as the boost period has expired for the single boosted hex + assert_eq!(0, hotspot_1.boosted_hexes_len()); + assert_eq!(0, hotspot_2.boosted_hexes_len()); + assert_eq!(0, hotspot_3.boosted_hexes_len()); + + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); - } else { - panic!("no rewards received"); - }; Ok(()) } #[sqlx::test] async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -796,55 +668,25 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let price_info = default_price_info(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + let rewards = mobile_rewards.finish().await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location NO boosts + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // full location trust NO boosts - let hotspot_1 = poc_rewards.next().unwrap(); // full location trust 1 boost - let hotspot_3 = poc_rewards.next().unwrap(); // reduced location trust 1 boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -889,20 +731,10 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: assert_eq!(2, hotspot_1.nth_boosted_hex(0).boosted_multiplier); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -911,7 +743,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( pool: PgPool, ) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -977,55 +809,25 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - - let price_info = default_price_info(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); + let rewards = mobile_rewards.finish().await?; - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // full location trust 1 boost + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // full location trust NO boosts + let hotspot_3 = poc_rewards.get(HOTSPOT_3).expect("hotspot 3"); // reduced location trust 1 boost - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // full location trust NO boosts - let hotspot_1 = poc_rewards.next().unwrap(); // full location trust 1 boost - let hotspot_3 = poc_rewards.next().unwrap(); // reduced location trust 1 boost - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_3.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -1070,27 +872,17 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( assert_eq!(2, hotspot_1.nth_boosted_hex(0).boosted_multiplier); assert_eq!(0x8a1fb466d2dffff_u64, hotspot_1.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } #[sqlx::test] async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -1181,54 +973,25 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an }, ]; - let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - let total_poc_emissions = - reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &MockHexBoostingClient::new(boosted_hexes), + mobile_rewards_client, + &speedtest_avg_client, + &reward_info, + default_price_info(), + ) + .await?; - let price_info = default_price_info(); + let rewards = mobile_rewards.finish().await?; - let (_, rewards) = tokio::join!( - // run rewards for poc and dc - rewarder::reward_poc_and_dc( - &pool, - &hex_boosting_client, - &mobile_rewards_client, - &speedtest_avg_client, - &reward_info, - price_info - ), - receive_expected_rewards_maybe_unallocated( - &mut mobile_rewards, - ExpectUnallocated::NoWhenValue(total_poc_emissions) - ) - ); - let Ok((poc_rewards, unallocated_reward)) = rewards else { - panic!("no rewards received"); - }; + let poc_rewards = rewards.radio_reward_v2s.as_keyed_map(); + let hotspot_1 = poc_rewards.get(HOTSPOT_1).expect("hotspot 1"); // 2 boosts at 10x + let hotspot_2 = poc_rewards.get(HOTSPOT_2).expect("hotspot 2"); // 1 boost at 20x + let hotspot_4 = poc_rewards.get(HOTSPOT_4).expect("hotspot 4"); // no boosts - let mut poc_rewards = poc_rewards.iter(); - let hotspot_2 = poc_rewards.next().unwrap(); // 1 boost at 20x - let hotspot_1 = poc_rewards.next().unwrap(); // 2 boosts at 10x - let hotspot_3 = poc_rewards.next().unwrap(); // no boosts - assert_eq!( - None, - poc_rewards.next(), - "Received more hotspots than expected in rewards" - ); - assert_eq!( - HOTSPOT_2.to_string(), - PublicKeyBinary::from(hotspot_2.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_1.to_string(), - PublicKeyBinary::from(hotspot_1.hotspot_key.clone()).to_string() - ); - assert_eq!( - HOTSPOT_4.to_string(), - PublicKeyBinary::from(hotspot_3.hotspot_key.clone()).to_string() - ); + assert_eq!(poc_rewards.len(), 3); // Calculating expected rewards let (regular_poc, boosted_poc) = get_poc_allocation_buckets(reward_info.epoch_emissions); @@ -1258,12 +1021,12 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an assert_eq!(exp_reward_1, hotspot_1.total_poc_reward()); assert_eq!(exp_reward_2, hotspot_2.total_poc_reward()); - assert_eq!(exp_reward_3, hotspot_3.total_poc_reward()); + assert_eq!(exp_reward_3, hotspot_4.total_poc_reward()); // assert the number of boosted hexes for each radio assert_eq!(1, hotspot_2.boosted_hexes_len()); assert_eq!(2, hotspot_1.boosted_hexes_len()); - assert_eq!(1, hotspot_3.boosted_hexes_len()); + assert_eq!(1, hotspot_4.boosted_hexes_len()); // assert the hex boost multiplier values // as hotspot 3 has 2 covered hexes, it should have 2 boosted hexes @@ -1280,20 +1043,10 @@ async fn test_poc_with_wifi_and_multi_coverage_boosted_hexes(pool: PgPool) -> an assert_eq!(0x8a1fb46622d7fff_u64, hotspot_1_boosted_hexes[1].location); assert_eq!(0x8a1fb49642dffff_u64, hotspot_2.nth_boosted_hex(0).location); - // confirm the total rewards allocated matches expectations - let poc_sum = - hotspot_1.total_poc_reward() + hotspot_2.total_poc_reward() + hotspot_3.total_poc_reward(); - let total = poc_sum + unallocated_reward.amount; - - let expected_sum = reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, total); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(total) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.6)); + // confirm the total rewards allocated matches emissions + // and the rewarded percentage amount matches percentage + let total = rewards.total_poc_rewards() + rewards.unallocated_amount_or_default(); + assert_total_matches_emissions(total, &reward_info); Ok(()) } @@ -1302,58 +1055,6 @@ fn rounded(num: Decimal) -> u64 { num.to_u64().unwrap_or_default() } -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - receive_expected_rewards_maybe_unallocated(mobile_rewards, ExpectUnallocated::Yes).await -} - -enum ExpectUnallocated { - Yes, - NoWhenValue(u64), -} - -async fn receive_expected_rewards_maybe_unallocated( - mobile_rewards: &mut MockFileSinkReceiver, - expect_unallocated: ExpectUnallocated, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - // get the filestore outputs from rewards run - let radio_reward1 = mobile_rewards.receive_radio_reward().await; - let radio_reward2 = mobile_rewards.receive_radio_reward().await; - let radio_reward3 = mobile_rewards.receive_radio_reward().await; - - // ordering is not guaranteed, so stick the rewards into a vec and sort - let mut poc_rewards = vec![radio_reward1, radio_reward2, radio_reward3]; - poc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); - - let unallocated_poc_reward = match expect_unallocated { - ExpectUnallocated::Yes => mobile_rewards.receive_unallocated_reward().await, - ExpectUnallocated::NoWhenValue(max_emission) => { - let total: u64 = poc_rewards.iter().map(|p| p.total_poc_reward()).sum(); - let emitted_is_total = total == max_emission; - tracing::info!( - emitted_is_total, - total, - max_emission, - "receiving expected rewards unallocated amount" - ); - if emitted_is_total { - UnallocatedReward { - reward_type: UnallocatedRewardType::Poc.into(), - amount: 0, - } - } else { - mobile_rewards.receive_unallocated_reward().await - } - } - }; - - // should be no further msgs - mobile_rewards.assert_no_messages(); - - Ok((poc_rewards, unallocated_poc_reward)) -} - async fn seed_heartbeats_v1( ts: DateTime, txn: &mut Transaction<'_, Postgres>, @@ -1860,3 +1561,20 @@ fn get_poc_allocation_buckets(total_emissions: Decimal) -> (Decimal, Decimal) { (regular_poc, boosted_poc) } + +fn assert_total_matches_emissions(total: u64, reward_info: &EpochRewardInfo) { + // confirm the total rewards allocated matches expectations + let total_poc_emissions = + reward_shares::get_scheduled_tokens_for_poc(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!( + total_poc_emissions, total, + "total does not match expected emissions" + ); + + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(total) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.6), "POC and DC is always 60%"); +} diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 9a2969022..8507ec5dc 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -1,13 +1,8 @@ -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; +use crate::common::{self, reward_info_24_hours, AsStringKeyedMap}; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::{ - services::poc_mobile::{ - MobileRewardShare, SubscriberReward, UnallocatedReward, UnallocatedRewardType, - }, - Message, -}; +use helium_proto::{services::poc_mobile::UnallocatedRewardType, Message}; use mobile_verifier::{ reward_shares, rewarder, subscriber_mapping_activity::{self, SubscriberMappingActivity}, @@ -24,8 +19,7 @@ const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; #[sqlx::test] async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); - + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); // seed db @@ -33,91 +27,47 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { seed_mapping_data(reward_info.epoch_period.end, &mut txn).await?; txn.commit().await.expect("db txn failed"); - let (_, rewards) = tokio::join!( - rewarder::reward_mappers(&pool, &mobile_rewards_client, &reward_info), - receive_expected_rewards(&mut mobile_rewards) - ); + rewarder::reward_mappers(&pool, mobile_rewards_client, &reward_info).await?; - if let Ok((subscriber_rewards, unallocated_reward)) = rewards { - // assert the mapper rewards - // all 3 subscribers will have an equal share, - // requirement is 1 qualifying mapping criteria report per epoch - // subscriber 1 has two qualifying mapping criteria reports, - // other two subscribers one qualifying mapping criteria reports - assert_eq!( - SUBSCRIBER_1.to_string().encode_to_vec(), - subscriber_rewards[0].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[0].discovery_location_amount - ); - - assert_eq!( - SUBSCRIBER_2.to_string().encode_to_vec(), - subscriber_rewards[1].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[2].discovery_location_amount - ); - - assert_eq!( - SUBSCRIBER_3.to_string().encode_to_vec(), - subscriber_rewards[2].subscriber_id - ); - assert_eq!( - 5_479_452_054_794, - subscriber_rewards[2].discovery_location_amount - ); - - // confirm our unallocated amount - assert_eq!( - UnallocatedRewardType::Mapper as i32, - unallocated_reward.reward_type - ); - assert_eq!(1, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - let subscriber_sum = subscriber_rewards[0].discovery_location_amount - + subscriber_rewards[1].discovery_location_amount - + subscriber_rewards[2].discovery_location_amount - + unallocated_reward.amount; - assert_eq!(expected_sum, subscriber_sum); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(subscriber_sum) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.2)); - } else { - panic!("no rewards received"); - }; - Ok(()) -} + let rewards = mobile_rewards.finish().await?; + let subscriber_rewards = rewards.subscriber_rewards.as_keyed_map(); -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(Vec, UnallocatedReward)> { - // get the filestore outputs from rewards run - // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios - let subscriber_reward1 = mobile_rewards.receive_subscriber_reward().await; - let subscriber_reward2 = mobile_rewards.receive_subscriber_reward().await; - let subscriber_reward3 = mobile_rewards.receive_subscriber_reward().await; - let mut subscriber_rewards = vec![subscriber_reward1, subscriber_reward2, subscriber_reward3]; + // assert the mapper rewards + // all 3 subscribers will have an equal share, + // requirement is 1 qualifying mapping criteria report per epoch + // subscriber 1 has two qualifying mapping criteria reports, + // other two subscribers one qualifying mapping criteria reports + let sub_reward_1 = subscriber_rewards.get(SUBSCRIBER_1).expect("sub 1"); + assert_eq!(5_479_452_054_794, sub_reward_1.discovery_location_amount); - subscriber_rewards.sort_by(|a, b| a.subscriber_id.cmp(&b.subscriber_id)); + let sub_reward_2 = subscriber_rewards.get(SUBSCRIBER_2).expect("sub 2"); + assert_eq!(5_479_452_054_794, sub_reward_2.discovery_location_amount); - // expect one unallocated reward - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; + let sub_reward_3 = subscriber_rewards.get(SUBSCRIBER_3).expect("sub 3"); + assert_eq!(5_479_452_054_794, sub_reward_3.discovery_location_amount); - // should be no further msgs - mobile_rewards.assert_no_messages(); + // confirm our unallocated amount + let unallocated_reward = rewards.unallocated.first().expect("unallocated"); + assert_eq!( + UnallocatedRewardType::Mapper as i32, + unallocated_reward.reward_type + ); + assert_eq!(1, unallocated_reward.amount); + + // confirm the total rewards allocated matches expectations + let expected_sum = reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + let subscriber_sum = + rewards.total_sub_discovery_amount() + rewards.unallocated_amount_or_default(); + assert_eq!(expected_sum, subscriber_sum); - Ok((subscriber_rewards, unallocated_reward)) + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(subscriber_sum) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.2)); + + Ok(()) } async fn seed_mapping_data( diff --git a/mobile_verifier/tests/integrations/rewarder_oracles.rs b/mobile_verifier/tests/integrations/rewarder_oracles.rs index 1f517ee93..4d97e7217 100644 --- a/mobile_verifier/tests/integrations/rewarder_oracles.rs +++ b/mobile_verifier/tests/integrations/rewarder_oracles.rs @@ -1,7 +1,5 @@ -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; -use helium_proto::services::poc_mobile::{ - MobileRewardShare, UnallocatedReward, UnallocatedRewardType, -}; +use crate::common::{self, reward_info_24_hours}; +use helium_proto::services::poc_mobile::UnallocatedRewardType; use mobile_verifier::{reward_shares, rewarder}; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -9,49 +7,33 @@ use sqlx::PgPool; #[sqlx::test] async fn test_oracle_rewards(_pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); - let (_, rewards) = tokio::join!( - // run rewards for oracles - rewarder::reward_oracles(&mobile_rewards_client, &reward_info), - receive_expected_rewards(&mut mobile_rewards) + // run rewards for oracles + rewarder::reward_oracles(mobile_rewards_client, &reward_info).await?; + + let rewards = mobile_rewards.finish().await?; + let unallocated_reward = rewards.unallocated.first().expect("Unallocated"); + + assert_eq!( + UnallocatedRewardType::Oracle as i32, + unallocated_reward.reward_type ); - if let Ok(unallocated_reward) = rewards { - assert_eq!( - UnallocatedRewardType::Oracle as i32, - unallocated_reward.reward_type - ); - // confirm our unallocated amount - assert_eq!(3_287_671_232_876, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_oracles(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, unallocated_reward.amount); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.04)); - } else { - panic!("no rewards received"); - }; - Ok(()) -} + // confirm our unallocated amount + assert_eq!(3_287_671_232_876, unallocated_reward.amount); -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result { - // expect one unallocated reward - // as oracle rewards are currently 100% unallocated - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; + // confirm the total rewards allocated matches expectations + let expected_sum = reward_shares::get_scheduled_tokens_for_oracles(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!(expected_sum, unallocated_reward.amount); - // should be no further msgs - mobile_rewards.assert_no_messages(); + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.04)); - Ok(unallocated_reward) + Ok(()) } diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index 692e457aa..10491059f 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -43,7 +43,7 @@ const PAYER_1: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; #[sqlx::test] async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -63,17 +63,16 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; - let dc_rewards = rewards.gateway_reward; + let poc_rewards = rewards.radio_reward_v2s; + let dc_rewards = rewards.gateway_rewards; let unallocated_reward = rewards.unallocated.first(); let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); @@ -117,7 +116,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -152,17 +151,16 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; - let poc_rewards = msgs.radio_reward_v2; - let dc_rewards = msgs.gateway_reward; + let poc_rewards = msgs.radio_reward_v2s; + let dc_rewards = msgs.gateway_rewards; // expecting single radio with poc rewards, no unallocated assert_eq!(poc_rewards.len(), 1); @@ -191,7 +189,7 @@ async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -215,20 +213,19 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { let _rewarder = rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info.clone(), ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; - assert_eq!(msgs.gateway_reward.len(), 3); - assert_eq!(msgs.radio_reward_v2.len(), 3); + assert_eq!(msgs.gateway_rewards.len(), 3); + assert_eq!(msgs.radio_reward_v2s.len(), 3); // ============================================================== - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); // SP ban radio, zeroed rewards are filtered out @@ -239,24 +236,23 @@ async fn test_sp_banned_radio(pool: PgPool) -> anyhow::Result<()> { let _rewarder = rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let msgs = mobile_rewards.finish().await?; - assert_eq!(msgs.gateway_reward.len(), 3); - assert_eq!(msgs.radio_reward_v2.len(), 2); + assert_eq!(msgs.gateway_rewards.len(), 3); + assert_eq!(msgs.radio_reward_v2s.len(), 2); Ok(()) } #[sqlx::test] async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -290,18 +286,17 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; + let poc_rewards = rewards.radio_reward_v2s; - let dc_rewards = rewards.gateway_reward; + let dc_rewards = rewards.gateway_rewards; // expecting single radio with poc rewards, no unallocated assert_eq!(poc_rewards.len(), 2); @@ -313,7 +308,7 @@ async fn test_all_banned_radio(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Result<()> { - let (mobile_rewards_client, mobile_rewards) = common::create_nonblocking_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -346,17 +341,16 @@ async fn test_data_banned_radio_still_receives_poc(pool: PgPool) -> anyhow::Resu rewarder::reward_poc_and_dc( &pool, &hex_boosting_client, - &mobile_rewards_client, + mobile_rewards_client, &speedtest_avg_client, &reward_info, price_info, ) .await?; - drop(mobile_rewards_client); let rewards = mobile_rewards.finish().await?; - let poc_rewards = rewards.radio_reward_v2; - let dc_rewards = rewards.gateway_reward; + let poc_rewards = rewards.radio_reward_v2s; + let dc_rewards = rewards.gateway_rewards; assert_eq!(poc_rewards.len(), 3); assert_eq!(dc_rewards.len(), 0); diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 38ca0fa51..29ea857d0 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -4,17 +4,14 @@ use std::string::ToString; use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use helium_proto::{ - service_provider_promotions::Promotion, - services::poc_mobile::{ - MobileRewardShare, ServiceProviderReward, UnallocatedReward, UnallocatedRewardType, - }, + service_provider_promotions::Promotion, services::poc_mobile::UnallocatedRewardType, ServiceProvider, ServiceProviderPromotions, }; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; -use crate::common::{self, reward_info_24_hours, MockFileSinkReceiver}; +use crate::common::{self, reward_info_24_hours, AsStringKeyedMap}; use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; use mobile_verifier::{data_session, reward_shares, rewarder, service_provider}; @@ -73,7 +70,7 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { let mut valid_sps = HashMap::::new(); valid_sps.insert(PAYER_1.to_string(), SP_1.to_string()); let carrier_client = MockCarrierServiceClient::new(valid_sps); - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let reward_info = reward_info_24_hours(); @@ -89,45 +86,38 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { .list_incentive_promotions(&reward_info.epoch_period.start) .await?; - let (_, rewards) = tokio::join!( - rewarder::reward_service_providers( - dc_sessions, - sp_promotions.into(), - &mobile_rewards_client, - &reward_info, - dec!(0.0001), - ), - receive_expected_rewards(&mut mobile_rewards) + rewarder::reward_service_providers( + dc_sessions, + sp_promotions.into(), + mobile_rewards_client, + &reward_info, + dec!(0.0001), + ) + .await?; + + let rewards = mobile_rewards.finish().await?; + + let sp_reward = rewards.sp_rewards.first().expect("sp 1 reward"); + assert_eq!(5_999, sp_reward.amount); + + let unallocated_reward = rewards.unallocated.first().expect("unallocated"); + assert_eq!( + UnallocatedRewardType::ServiceProvider as i32, + unallocated_reward.reward_type ); - if let Ok((sp_reward, unallocated_reward)) = rewards { - assert_eq!( - SP_1.to_string(), - ServiceProvider::try_from(sp_reward.service_provider_id) - .unwrap() - .to_string() - ); - assert_eq!(5_999, sp_reward.amount); - - assert_eq!( - UnallocatedRewardType::ServiceProvider as i32, - unallocated_reward.reward_type - ); - assert_eq!(8_219_178_076_192, unallocated_reward.amount); - - // confirm the total rewards allocated matches expectations - let expected_sum = - reward_shares::get_scheduled_tokens_for_service_providers(reward_info.epoch_emissions) - .to_u64() - .unwrap(); - assert_eq!(expected_sum, sp_reward.amount + unallocated_reward.amount); - - // confirm the rewarded percentage amount matches expectations - let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) - .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); - assert_eq!(percent, dec!(0.1)); - } else { - panic!("no rewards received"); - } + assert_eq!(8_219_178_076_192, unallocated_reward.amount); + + // confirm the total rewards allocated matches expectations + let expected_sum = + reward_shares::get_scheduled_tokens_for_service_providers(reward_info.epoch_emissions) + .to_u64() + .unwrap(); + assert_eq!(expected_sum, sp_reward.amount + unallocated_reward.amount); + + // confirm the rewarded percentage amount matches expectations + let percent = (Decimal::from(unallocated_reward.amount) / reward_info.epoch_emissions) + .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(percent, dec!(0.1)); Ok(()) } @@ -189,7 +179,7 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result let reward_info = reward_info_24_hours(); - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (mobile_rewards_client, mobile_rewards) = common::create_file_sink(); let mut txn = pool.begin().await?; seed_hotspot_data(reward_info.epoch_period.end, &mut txn).await?; // DC transferred == 6,000 reward amount @@ -203,50 +193,35 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result .list_incentive_promotions(&reward_info.epoch_period.start) .await?; - let (_, rewards) = tokio::join!( - rewarder::reward_service_providers( - dc_sessions, - sp_promotions.into(), - &mobile_rewards_client, - &reward_info, - dec!(0.00001) - ), - async move { - let mut promos = vec![ - mobile_rewards.receive_promotion_reward().await, - mobile_rewards.receive_promotion_reward().await, - mobile_rewards.receive_promotion_reward().await, - ]; - // sort by awarded amount least -> most - promos.sort_by_key(|a| a.service_provider_amount); - - let sp_reward = mobile_rewards.receive_service_provider_reward().await; - let unallocated = mobile_rewards.receive_unallocated_reward().await; - - mobile_rewards.assert_no_messages(); - - (promos, sp_reward, unallocated) - } - ); + rewarder::reward_service_providers( + dc_sessions, + sp_promotions.into(), + mobile_rewards_client, + &reward_info, + dec!(0.00001), + ) + .await?; - let (promos, sp_reward, unallocated) = rewards; - let promo_reward_1 = promos[0].clone(); - let promo_reward_2 = promos[1].clone(); - let promo_reward_3 = promos[2].clone(); + let rewards = mobile_rewards.finish().await?; + let promo_rewards = rewards.promotion_rewards.as_keyed_map(); // 1 share + let promo_reward_1 = promo_rewards.get("one").expect("promo 1"); assert_eq!(promo_reward_1.service_provider_amount, 1_499); assert_eq!(promo_reward_1.matched_amount, 1_499); // 2 shares + let promo_reward_2 = promo_rewards.get("two").expect("promo 2"); assert_eq!(promo_reward_2.service_provider_amount, 2999); assert_eq!(promo_reward_2.matched_amount, 2999); // 3 shares + let promo_reward_3 = promo_rewards.get("three").expect("promo 3"); assert_eq!(promo_reward_3.service_provider_amount, 4_499); assert_eq!(promo_reward_3.matched_amount, 4_499); // dc_percentage * total_sp_allocation rounded down + let sp_reward = rewards.sp_rewards.first().expect("sp 1 reward"); assert_eq!(sp_reward.amount, 50_999); let unallocated_sp_rewards = get_unallocated_sp_rewards(reward_info.epoch_emissions); @@ -256,29 +231,12 @@ async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result - 8_998 // matched promotion + 2; // rounding + let unallocated = rewards.unallocated.first().expect("unallocated"); assert_eq!(unallocated.amount, expected_unallocated); Ok(()) } -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(ServiceProviderReward, UnallocatedReward)> { - // get the filestore outputs from rewards run - // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios - let sp_reward1 = mobile_rewards.receive_service_provider_reward().await; - // let sp_reward2 = mobile_rewards.receive_service_provider_reward().await; - // dump the sp rewards into a vec and sort to get a deteminstic order - - // expect one unallocated reward - let unallocated_reward = mobile_rewards.receive_unallocated_reward().await; - - // should be no further msgs - mobile_rewards.assert_no_messages(); - - Ok((sp_reward1, unallocated_reward)) -} - async fn seed_hotspot_data( ts: DateTime, txn: &mut Transaction<'_, Postgres>, diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 5bdc961eb..8e701e0d0 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -54,17 +54,9 @@ async fn speedtests_average_should_only_include_last_48_hours( ) -> anyhow::Result<()> { let (_tx, rx) = tokio::sync::mpsc::channel(2); let gateway_info_resolver = MockGatewayInfoResolver {}; - let (speedtest_avg_client, mut speedtest_avg_receiver) = common::create_file_sink(); + let (speedtest_avg_client, speedtest_avg_receiver) = common::create_file_sink(); let (verified_client, _verified_receiver) = common::create_file_sink(); - let daemon = SpeedtestDaemon::new( - pool, - gateway_info_resolver, - rx, - speedtest_avg_client, - verified_client, - ); - let hotspot: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; @@ -77,9 +69,20 @@ async fn speedtests_average_should_only_include_last_48_hours( speedtest(&hotspot, "2024-01-06 01:00:00", 10, 100, 10), ]); - assert!(daemon.process_file(stream).await.is_ok()); + // Drop the daemon when it's done running to close the channel + { + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + daemon.process_file(stream).await?; + } - let avgs = speedtest_avg_receiver.get_all_speedtest_avgs().await; + let avgs = speedtest_avg_receiver.finish().await?; assert_eq!(6, avgs.len()); assert_eq!(SpeedtestAvgValidity::TooFewSamples, avgs[0].validity());