diff --git a/mobile_verifier/migrations/45_drop_grandfathered_radio_threshold.sql b/mobile_verifier/migrations/45_drop_grandfathered_radio_threshold.sql new file mode 100644 index 000000000..6480b3d3d --- /dev/null +++ b/mobile_verifier/migrations/45_drop_grandfathered_radio_threshold.sql @@ -0,0 +1 @@ +DROP TABLE grandfathered_radio_threshold; diff --git a/mobile_verifier/migrations/46_radio_threshold_rework_unique_idx.sql b/mobile_verifier/migrations/46_radio_threshold_rework_unique_idx.sql new file mode 100644 index 000000000..0ac4a22b4 --- /dev/null +++ b/mobile_verifier/migrations/46_radio_threshold_rework_unique_idx.sql @@ -0,0 +1,7 @@ +DELETE FROM radio_threshold WHERE cbsd_id IS NOT NULL; + +DROP INDEX radio_threshold_hotspot_pubkey_cbsd_id_idx; + +ALTER TABLE radio_threshold DROP COLUMN cbsd_id; + +CREATE UNIQUE INDEX radio_threshold_hotspot_pubkey_idx ON radio_threshold (hotspot_pubkey); diff --git a/mobile_verifier/migrations/47_drop_cbrs_heartbeats.sql b/mobile_verifier/migrations/47_drop_cbrs_heartbeats.sql new file mode 100644 index 000000000..03439ccb1 --- /dev/null +++ b/mobile_verifier/migrations/47_drop_cbrs_heartbeats.sql @@ -0,0 +1 @@ +DROP TABLE cbrs_heartbeats; diff --git a/mobile_verifier/migrations/48_drop_old_hex_coverage.sql b/mobile_verifier/migrations/48_drop_old_hex_coverage.sql new file mode 100644 index 000000000..c40b8493d --- /dev/null +++ b/mobile_verifier/migrations/48_drop_old_hex_coverage.sql @@ -0,0 +1 @@ +DROP TABLE old_hex_coverage; diff --git a/mobile_verifier/migrations/49_delete_cbrs_from_seniority.sql b/mobile_verifier/migrations/49_delete_cbrs_from_seniority.sql new file mode 100644 index 000000000..c93a25ec9 --- /dev/null +++ b/mobile_verifier/migrations/49_delete_cbrs_from_seniority.sql @@ -0,0 +1 @@ +DELETE FROM seniority WHERE radio_type = 'cbrs'; diff --git a/mobile_verifier/migrations/50_delete_cbrs_from_coverage_objects.sql b/mobile_verifier/migrations/50_delete_cbrs_from_coverage_objects.sql new file mode 100644 index 000000000..ddf295ddb --- /dev/null +++ b/mobile_verifier/migrations/50_delete_cbrs_from_coverage_objects.sql @@ -0,0 +1 @@ +DELETE FROM coverage_objects WHERE radio_type = 'cbrs'; diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 41540076a..d2be62810 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -2,7 +2,7 @@ pub mod last_location; pub mod wifi; use crate::{ - cell_type::{CellType, CellTypeLabel}, + cell_type::CellType, coverage::{self, CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta}, geofence::GeofenceValidator, seniority::{Seniority, SeniorityUpdate}, @@ -205,8 +205,7 @@ impl From for Heartbeat { #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct HeartbeatReward { pub hotspot_key: PublicKeyBinary, - pub cell_type: CellType, - pub distances_to_asserted: Option>, + pub distances_to_asserted: Vec, pub trust_score_multipliers: Vec, pub coverage_object: Uuid, } @@ -216,11 +215,8 @@ impl HeartbeatReward { KeyType::Wifi(&self.hotspot_key) } - pub fn id(&self) -> anyhow::Result { - match self.cell_type.to_label() { - CellTypeLabel::Wifi => Ok(self.hotspot_key.to_string()), - _ => Err(anyhow!("failed to derive label from cell type")), - } + pub fn id(&self) -> String { + self.hotspot_key.to_string() } pub fn validated<'a>( @@ -234,16 +230,21 @@ impl HeartbeatReward { .fetch(exec) } - pub fn iter_distances_and_scores(&self) -> impl Iterator { - let fallback: Vec = std::iter::repeat(0) - .take(self.trust_score_multipliers.len()) - .collect(); + pub fn iter_distances_and_scores( + &self, + ) -> anyhow::Result> { + // This should never happen if valid_radio.sql is not touched + if self.trust_score_multipliers.len() != self.distances_to_asserted.len() { + return Err(anyhow!( + "Mismatched lengths between distances_to_asserted and trust_score_multipliers" + )); + } - self.distances_to_asserted + Ok(self + .distances_to_asserted .clone() - .unwrap_or(fallback) .into_iter() - .zip(self.trust_score_multipliers.clone()) + .zip(self.trust_score_multipliers.clone())) } } @@ -629,11 +630,6 @@ pub async fn clear_heartbeats( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, timestamp: &DateTime, ) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM cbrs_heartbeats WHERE truncated_timestamp < $1;") - .bind(timestamp) - .execute(&mut *tx) - .await?; - sqlx::query("DELETE FROM wifi_heartbeats WHERE truncated_timestamp < $1;") .bind(timestamp) .execute(&mut *tx) @@ -650,6 +646,38 @@ mod test { use file_store::wifi_heartbeat::WifiHeartbeat; use proto::SeniorityUpdateReason::*; + #[test] + fn test_iter_distances_and_scores_with_matching_lengths() { + let reward = HeartbeatReward { + hotspot_key: PublicKeyBinary::from(vec![1, 2, 3]), + distances_to_asserted: vec![10, 20, 30], + trust_score_multipliers: vec![dec!(0.25), dec!(0.5), dec!(1.0)], + coverage_object: uuid::Uuid::new_v4(), + }; + + let result = reward.iter_distances_and_scores(); + assert!(result.is_ok(), "Expected successful iteration"); + + let pairs: Vec<(i64, Decimal)> = result.unwrap().collect(); + assert_eq!(pairs.len(), 3, "Expected 3 pairs in the iterator"); + assert_eq!(pairs[0], (10, dec!(0.25))); + assert_eq!(pairs[1], (20, dec!(0.5))); + assert_eq!(pairs[2], (30, dec!(1.0))); + } + + #[test] + fn test_iter_distances_and_scores_with_mismatched_lengths() { + let reward = HeartbeatReward { + hotspot_key: PublicKeyBinary::from(vec![1, 2, 3]), + distances_to_asserted: vec![10, 20], // Only 2 elements + trust_score_multipliers: vec![dec!(0.25), dec!(0.5), dec!(1.0)], // 3 elements + coverage_object: uuid::Uuid::new_v4(), + }; + + let result = reward.iter_distances_and_scores(); + assert!(result.is_err(), "Expected error due to mismatched lengths"); + } + fn heartbeat(timestamp: DateTime, coverage_object: Uuid) -> ValidatedHeartbeat { ValidatedHeartbeat { cell_type: CellType::CellTypeNone, diff --git a/mobile_verifier/src/heartbeats/valid_radios.sql b/mobile_verifier/src/heartbeats/valid_radios.sql index e64af1c43..aebcc70f1 100644 --- a/mobile_verifier/src/heartbeats/valid_radios.sql +++ b/mobile_verifier/src/heartbeats/valid_radios.sql @@ -1,42 +1,6 @@ -WITH latest_cbrs_hotspot AS ( - SELECT DISTINCT ON (cbsd_id) - cbsd_id, - hotspot_key - FROM - cbrs_heartbeats - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - cbsd_id, - latest_timestamp DESC -), -heartbeats AS ( - SELECT - lch.hotspot_key, - ch.cbsd_id, - ch.cell_type, - CASE WHEN count(*) >= $3 THEN - 1.0 - ELSE - 0.0 - END AS heartbeat_multiplier, - NULL as distances_to_asserted, - ARRAY_AGG(ch.location_trust_score_multiplier) as trust_score_multipliers - FROM - cbrs_heartbeats ch - INNER JOIN latest_cbrs_hotspot lch ON ch.cbsd_id = lch.cbsd_id - WHERE - ch.truncated_timestamp >= $1 - AND ch.truncated_timestamp < $2 - GROUP BY - ch.cbsd_id, - lch.hotspot_key, - ch.cell_type - UNION +WITH heartbeats AS ( SELECT hotspot_key, - NULL AS cbsd_id, cell_type, CASE WHEN count(*) >= $3 THEN 1.0 @@ -54,35 +18,21 @@ heartbeats AS ( hotspot_key, cell_type ), -latest_uuids AS (( SELECT DISTINCT ON (hotspot_key, - cbsd_id) - hotspot_key, - cbsd_id, - coverage_object - FROM - cbrs_heartbeats ch - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - hotspot_key, - cbsd_id, - truncated_timestamp DESC) - UNION ( SELECT DISTINCT ON (hotspot_key) - hotspot_key, - NULL AS cbsd_id, - coverage_object - FROM - wifi_heartbeats wh - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - hotspot_key, - truncated_timestamp DESC)) +latest_uuids AS ( + SELECT DISTINCT ON (hotspot_key) + hotspot_key, + coverage_object + FROM + wifi_heartbeats wh + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ORDER BY + hotspot_key, + truncated_timestamp DESC +) SELECT hb.hotspot_key, - hb.cbsd_id, hb.cell_type, hb.distances_to_asserted, hb.trust_score_multipliers, @@ -90,8 +40,5 @@ SELECT FROM heartbeats hb INNER JOIN latest_uuids u ON hb.hotspot_key = u.hotspot_key - AND (hb.cbsd_id = u.cbsd_id - OR (hb.cbsd_id IS NULL - AND u.cbsd_id IS NULL)) WHERE hb.heartbeat_multiplier = 1.0 diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 3a5d97e9b..ef5d62d34 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -25,7 +25,7 @@ use helium_proto::services::{ }, }; use mobile_config::client::authorization_client::AuthorizationVerifier; -use sqlx::{FromRow, PgPool, Pool, Postgres, Row, Transaction}; +use sqlx::{PgPool, Pool, Postgres, Transaction}; use std::{collections::HashSet, ops::Range}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; @@ -175,7 +175,7 @@ where .map(anyhow::Ok) .try_fold(transaction, |mut transaction, ingest_report| async move { // verify the report - let verified_report_status = self.verify_report(&ingest_report.report).await?; + let verified_report_status = self.verify_report(&ingest_report.report).await; // if the report is valid then save to the db // and thus available to the rewarder @@ -254,17 +254,8 @@ where async fn verify_report( &self, report: &RadioThresholdReportReq, - ) -> anyhow::Result { - let is_legacy = self.verify_legacy(&report.hotspot_pubkey, &None).await?; - let report_validity = self.do_report_verifications(report).await; - let final_validity = if is_legacy - && report_validity == RadioThresholdReportVerificationStatus::ThresholdReportStatusValid - { - RadioThresholdReportVerificationStatus::ThresholdReportStatusLegacyValid - } else { - report_validity - }; - Ok(final_validity) + ) -> RadioThresholdReportVerificationStatus { + self.do_report_verifications(report).await } async fn verify_invalid_report( @@ -293,43 +284,23 @@ where .await .unwrap_or_default() } - - async fn verify_legacy( - &self, - hotspot_key: &PublicKeyBinary, - cbsd_id: &Option, - ) -> Result { - // check if the radio has been grandfathered in, meaning a radio which has received - // boosted rewards prior to the data component of hip84 going live - // if true then it is assigned a status reason of Legacy - // TODO: remove this handling after the grandfathering period - let row = sqlx::query(" select exists(select 1 from grandfathered_radio_threshold where hotspot_pubkey = $1 and (cbsd_id is null or cbsd_id = $2)) ") - .bind(hotspot_key) - .bind(cbsd_id) - .fetch_one(&self.pool) - .await?; - Ok(row.get("exists")) - } } pub async fn save( ingest_report: &RadioThresholdIngestReport, db: &mut Transaction<'_, Postgres>, ) -> Result<(), sqlx::Error> { - let cbsd_id: Option = None; - sqlx::query( r#" INSERT INTO radio_threshold ( hotspot_pubkey, - cbsd_id, bytes_threshold, subscriber_threshold, threshold_timestamp, threshold_met, recv_timestamp) - VALUES ($1, $2, $3, $4, $5, true, $6) - ON CONFLICT (hotspot_pubkey, COALESCE(cbsd_id, '')) + VALUES ($1, $2, $3, $4, true, $5) + ON CONFLICT (hotspot_pubkey) DO UPDATE SET bytes_threshold = EXCLUDED.bytes_threshold, subscriber_threshold = EXCLUDED.subscriber_threshold, @@ -339,7 +310,6 @@ pub async fn save( "#, ) .bind(ingest_report.report.hotspot_pubkey.to_string()) - .bind(cbsd_id) .bind(ingest_report.report.bytes_threshold as i64) .bind(ingest_report.report.subscriber_threshold as i32) .bind(ingest_report.report.threshold_timestamp) @@ -349,11 +319,6 @@ pub async fn save( Ok(()) } -#[derive(FromRow, Debug)] -pub struct RadioThreshold { - hotspot_pubkey: PublicKeyBinary, -} - #[derive(Debug, Clone, Default)] pub struct VerifiedRadioThresholds { gateways: HashSet, @@ -373,17 +338,16 @@ pub async fn verified_radio_thresholds( pool: &sqlx::Pool, reward_period: &Range>, ) -> Result { - let mut rows = sqlx::query_as::<_, RadioThreshold>( - "SELECT hotspot_pubkey - FROM radio_threshold WHERE threshold_timestamp < $1 and cbsd_id IS NULL", + let gateways = sqlx::query_scalar::<_, PublicKeyBinary>( + "SELECT hotspot_pubkey FROM radio_threshold WHERE threshold_timestamp < $1", ) .bind(reward_period.end) - .fetch(pool); - let mut map = VerifiedRadioThresholds::default(); - while let Some(row) = rows.try_next().await? { - map.insert(row.hotspot_pubkey); - } - Ok(map) + .fetch_all(pool) + .await? + .into_iter() + .collect::>(); + + Ok(VerifiedRadioThresholds { gateways }) } pub async fn delete( @@ -392,8 +356,7 @@ pub async fn delete( ) -> Result<(), sqlx::Error> { sqlx::query( r#" - DELETE FROM radio_threshold - WHERE hotspot_pubkey = $1 AND cbsd_id is null + DELETE FROM radio_threshold WHERE hotspot_pubkey = $1 "#, ) .bind(ingest_report.report.hotspot_pubkey.to_string()) @@ -401,3 +364,362 @@ pub async fn delete( .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use file_store::mobile_radio_threshold::{RadioThresholdIngestReport, RadioThresholdReportReq}; + use helium_crypto::{KeyTag, Keypair}; + use rand::rngs::OsRng; + use sqlx::{prelude::FromRow, Row}; + + fn generate_keypair() -> Keypair { + Keypair::generate(KeyTag::default(), &mut OsRng) + } + + #[sqlx::test] + async fn test_save_radio_threshold(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report, + }; + + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + #[derive(Debug, FromRow, PartialEq)] + struct TestRadioThreshold { + hotspot_pubkey: PublicKeyBinary, + bytes_threshold: i64, + subscriber_threshold: i32, + threshold_met: bool, + threshold_timestamp: DateTime, + } + + let result = sqlx::query_as::<_, TestRadioThreshold>( + r#" + SELECT + hotspot_pubkey, + bytes_threshold, + subscriber_threshold, + threshold_timestamp, + threshold_met + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(&hotspot_pubkey) + .fetch_one(&pool) + .await + .unwrap(); + + pub fn nanos_trunc(ts: DateTime) -> DateTime { + use chrono::{Duration, DurationRound}; + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() + } + + assert_eq!( + result, + TestRadioThreshold { + hotspot_pubkey, + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_met: true, + threshold_timestamp: nanos_trunc(now) + } + ); + } + + #[sqlx::test] + async fn test_save_radio_threshold_update_existing(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let initial_report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let initial_ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report: initial_report, + }; + + // Save initial report + let mut transaction = pool.begin().await.unwrap(); + save(&initial_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + // Create an updated report with different values + let updated_now = Utc::now(); + let updated_report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + bytes_threshold: 2000, // Changed value + subscriber_threshold: 100, // Changed value + threshold_timestamp: updated_now, + }; + + let updated_ingest_report = RadioThresholdIngestReport { + received_timestamp: updated_now, + report: updated_report, + }; + + // Save updated report - should update the existing record + let mut transaction = pool.begin().await.unwrap(); + save(&updated_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + #[derive(Debug, FromRow, PartialEq)] + struct TestRadioThreshold { + hotspot_pubkey: PublicKeyBinary, + bytes_threshold: i64, + subscriber_threshold: i32, + threshold_met: bool, + threshold_timestamp: DateTime, + } + + let result = sqlx::query_as::<_, TestRadioThreshold>( + r#" + SELECT + hotspot_pubkey, + bytes_threshold, + subscriber_threshold, + threshold_timestamp, + threshold_met + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(&hotspot_pubkey) + .fetch_one(&pool) + .await + .unwrap(); + + pub fn nanos_trunc(ts: DateTime) -> DateTime { + use chrono::{Duration, DurationRound}; + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() + } + + assert_eq!( + result, + TestRadioThreshold { + hotspot_pubkey, + bytes_threshold: 2000, + subscriber_threshold: 100, + threshold_met: true, + threshold_timestamp: nanos_trunc(updated_now) + } + ); + } + + #[sqlx::test] + async fn test_delete_radio_threshold(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let hotspot_keypair_2 = generate_keypair(); + let hotspot_pubkey_2 = PublicKeyBinary::from(hotspot_keypair_2.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report, + }; + + let report_2 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey_2.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report_2 = RadioThresholdIngestReport { + received_timestamp: now, + report: report_2, + }; + + // Save the radio threshold + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report, &mut transaction).await.unwrap(); + save(&ingest_report_2, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + // Verify the record exists + let count_before = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + "#, + ) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_before, 2, "Records should exist before deletion"); + + // Create an invalidated radio threshold report + let invalidation_report = InvalidatedRadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + reason: 0.try_into().unwrap(), + timestamp: now, + }; + + let invalidated_ingest_report = InvalidatedRadioThresholdIngestReport { + received_timestamp: Utc::now(), + report: invalidation_report, + }; + + // Delete the radio threshold + let mut transaction = pool.begin().await.unwrap(); + delete(&invalidated_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + // Verify the record (2) still exists + let count_after = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey_2.to_string()) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_after, 1, "Record (2) should still exists"); + + // Verify the record (1) was deleted + let count_after = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey.to_string()) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_after, 0, "Record (1) should be deleted"); + } + + #[sqlx::test] + async fn test_verified_radio_thresholds(pool: PgPool) { + // Setup: Create radio thresholds with different timestamps + let now = Utc::now(); + + // Create two hotspot keypairs + let hotspot_keypair1 = generate_keypair(); + let hotspot_pubkey1 = PublicKeyBinary::from(hotspot_keypair1.public_key().to_owned()); + + let hotspot_keypair2 = generate_keypair(); + let hotspot_pubkey2 = PublicKeyBinary::from(hotspot_keypair2.public_key().to_owned()); + + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + // Create radio threshold reports with different timestamps + // Hotspot 1 - before the reward period end (should be verified) + let report1 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey1.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now - chrono::Duration::hours(5), // Before the reward period end + }; + + // Hotspot 2 - after the reward period end (should not be verified) + let report2 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey2.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 2000, + subscriber_threshold: 100, + threshold_timestamp: now + chrono::Duration::hours(2), // After the reward period end + }; + + // Create and save ingest reports + let ingest_report1 = RadioThresholdIngestReport { + received_timestamp: now, + report: report1, + }; + + let ingest_report2 = RadioThresholdIngestReport { + received_timestamp: now, + report: report2, + }; + + // Save both reports + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report1, &mut transaction).await.unwrap(); + save(&ingest_report2, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + // Define reward period for testing + let reward_period = (now - chrono::Duration::hours(24))..now; + + // Call the function under test + let verified_thresholds = verified_radio_thresholds(&pool, &reward_period) + .await + .unwrap(); + + // Verify results + assert!( + verified_thresholds.is_verified(hotspot_pubkey1.clone()), + "Hotspot 1 should be verified (threshold timestamp before period end)" + ); + + assert!( + !verified_thresholds.is_verified(hotspot_pubkey2.clone()), + "Hotspot 2 should not be verified (threshold timestamp after period end)" + ); + + // Verify total count + let count = verified_thresholds.gateways.len(); + assert_eq!(count, 1, "Should have exactly 1 verified hotspot"); + } +} diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 4b0c0d715..19d13dc57 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -381,7 +381,7 @@ impl CoverageShares { .await?; let trust_scores: Vec = heartbeat - .iter_distances_and_scores() + .iter_distances_and_scores()? .map(|(distance, trust_score)| LocationTrust { meters_to_asserted: distance as u32, trust_score, @@ -726,7 +726,6 @@ mod test { use hex_assignments::{assignment::HexAssignments, Assignment}; use crate::{ - cell_type::CellType, coverage::{CoveredHexStream, HexCoverage}, data_session::{self, HotspotDataSession, HotspotReward}, heartbeats::{HeartbeatReward, KeyType, OwnedKeyType}, @@ -1180,8 +1179,7 @@ mod test { let heartbeat_rewards = vec![HeartbeatReward { hotspot_key: gw1.clone(), coverage_object: cov_obj_1, - cell_type: CellType::NovaGenericWifiOutdoor, - distances_to_asserted: None, + distances_to_asserted: vec![1], trust_score_multipliers: vec![dec!(1.0)], }] .into_iter() @@ -1311,29 +1309,25 @@ mod test { HeartbeatReward { hotspot_key: gw10.clone(), coverage_object: cov_obj_10, - cell_type: CellType::NovaGenericWifiIndoor, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![1], trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw20.clone(), coverage_object: cov_obj_20, - cell_type: CellType::NovaGenericWifiIndoor, - distances_to_asserted: Some(vec![0, 250, 250]), + distances_to_asserted: vec![0, 250, 250], trust_score_multipliers: vec![dec!(1.0), dec!(0.25), dec!(0.25)], }, HeartbeatReward { hotspot_key: gw21.clone(), coverage_object: cov_obj_21, - cell_type: CellType::NovaGenericWifiIndoor, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw30.clone(), coverage_object: cov_obj_30, - cell_type: CellType::NovaGenericWifiIndoor, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, ] @@ -1492,17 +1486,15 @@ mod test { // add qualified wifi indoor HB HeartbeatReward { hotspot_key: gw1.clone(), - cell_type: CellType::NovaGenericWifiOutdoor, coverage_object: g1_cov_obj, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, // add unqualified wifi indoor HB HeartbeatReward { hotspot_key: gw2.clone(), - cell_type: CellType::NovaGenericWifiOutdoor, coverage_object: g2_cov_obj, - distances_to_asserted: None, + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, ] diff --git a/mobile_verifier/tests/integrations/heartbeats.rs b/mobile_verifier/tests/integrations/heartbeats.rs index a64a61b67..9b48acd55 100644 --- a/mobile_verifier/tests/integrations/heartbeats.rs +++ b/mobile_verifier/tests/integrations/heartbeats.rs @@ -51,7 +51,6 @@ async fn test_save_wifi_heartbeat(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> { let coverage_object = Uuid::new_v4(); - let cell_type = CellType::NovaGenericWifiIndoor; let hotspot_1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; let hotspot_2: PublicKeyBinary = @@ -102,9 +101,8 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot_2, - cell_type, trust_score_multipliers: vec![Decimal::ONE; 13], - distances_to_asserted: Some(vec![0; 13]), + distances_to_asserted: vec![0; 13], coverage_object, }] ); @@ -191,9 +189,8 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot, - cell_type: CellType::NovaGenericWifiIndoor, trust_score_multipliers: vec![Decimal::ONE; 12], - distances_to_asserted: Some(vec![0; 12]), + distances_to_asserted: vec![0; 12], coverage_object: latest_coverage_object, }] ); @@ -241,7 +238,6 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot, - cell_type: CellType::NovaGenericWifiIndoor, trust_score_multipliers: vec![ dec!(1.0), dec!(1.0), @@ -256,7 +252,7 @@ VALUES dec!(0.25), dec!(0.25) ], - distances_to_asserted: Some(vec![0; 12]), + distances_to_asserted: vec![0; 12], coverage_object: latest_coverage_object, }] );