From 32bc9416602a004c586850a07686d262d0ce4070 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 12:02:26 +0300 Subject: [PATCH 01/23] Add test_save_radio_threshold tests --- mobile_verifier/src/radio_threshold.rs | 158 +++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 3a5d97e9b..15fc8fe85 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -401,3 +401,161 @@ pub async fn delete( .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use file_store::mobile_radio_threshold::{RadioThresholdIngestReport, RadioThresholdReportReq}; + use helium_crypto::{KeyTag, Keypair}; + use rand::rngs::OsRng; + use sqlx::Row; + + fn generate_keypair() -> Keypair { + Keypair::generate(KeyTag::default(), &mut OsRng) + } + + #[sqlx::test] + async fn test_save_radio_threshold(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report, + }; + + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + let result = sqlx::query( + r#" + SELECT + hotspot_pubkey, + cbsd_id, + bytes_threshold, + subscriber_threshold, + threshold_timestamp, + threshold_met + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey.to_string()) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!( + result.get::("hotspot_pubkey"), + hotspot_pubkey.to_string() + ); + assert_eq!(result.get::, _>("cbsd_id"), None); + assert_eq!(result.get::("bytes_threshold"), 1000); + assert_eq!(result.get::("subscriber_threshold"), 50); + assert!(result.get::("threshold_met")); + + let stored_threshold_timestamp = result.get::, _>("threshold_timestamp"); + assert_eq!( + stored_threshold_timestamp.timestamp_millis(), + now.timestamp_millis() + ); + } + + #[sqlx::test] + async fn test_save_radio_threshold_update_existing(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let initial_report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let initial_ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report: initial_report, + }; + + // Save initial report + let mut transaction = pool.begin().await.unwrap(); + save(&initial_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + // Create an updated report with different values + let updated_now = Utc::now(); + let updated_report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + bytes_threshold: 2000, // Changed value + subscriber_threshold: 100, // Changed value + threshold_timestamp: updated_now, + }; + + let updated_ingest_report = RadioThresholdIngestReport { + received_timestamp: updated_now, + report: updated_report, + }; + + // Save updated report - should update the existing record + let mut transaction = pool.begin().await.unwrap(); + save(&updated_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + // Verify - Query the database to confirm the record was updated + let result = sqlx::query( + r#" + SELECT + hotspot_pubkey, + cbsd_id, + bytes_threshold, + subscriber_threshold, + threshold_timestamp, + threshold_met + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey.to_string()) + .fetch_one(&pool) + .await + .unwrap(); + + // Assertions - should have the updated values + assert_eq!( + result.get::("hotspot_pubkey"), + hotspot_pubkey.to_string() + ); + assert_eq!(result.get::, _>("cbsd_id"), None); + assert_eq!(result.get::("bytes_threshold"), 2000); // Updated value + assert_eq!(result.get::("subscriber_threshold"), 100); // Updated value + assert!(result.get::("threshold_met")); + let stored_threshold_timestamp = result.get::, _>("threshold_timestamp"); + assert_eq!( + stored_threshold_timestamp.timestamp_millis(), + updated_now.timestamp_millis() + ); + } +} From e8666c024c9516d851e2f982108734db556a37e7 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 12:26:46 +0300 Subject: [PATCH 02/23] Add test_delete_radio_threshold --- mobile_verifier/src/radio_threshold.rs | 109 +++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 15fc8fe85..9777ef965 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -558,4 +558,113 @@ mod tests { updated_now.timestamp_millis() ); } + + #[sqlx::test] + async fn test_delete_radio_threshold(pool: PgPool) { + let now = Utc::now(); + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + let hotspot_keypair_2 = generate_keypair(); + let hotspot_pubkey_2 = PublicKeyBinary::from(hotspot_keypair_2.public_key().to_owned()); + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + let report = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report = RadioThresholdIngestReport { + received_timestamp: now, + report, + }; + + let report_2 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey_2.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now, + }; + + let ingest_report_2 = RadioThresholdIngestReport { + received_timestamp: now, + report: report_2, + }; + + // Save the radio threshold + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report, &mut transaction).await.unwrap(); + save(&ingest_report_2, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + // Verify the record exists + let count_before = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + "#, + ) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_before, 2, "Records should exist before deletion"); + + // Create an invalidated radio threshold report + let invalidation_report = InvalidatedRadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey.clone(), + carrier_pub_key: carrier_pubkey, + reason: 0.try_into().unwrap(), + timestamp: now, + }; + + let invalidated_ingest_report = InvalidatedRadioThresholdIngestReport { + received_timestamp: Utc::now(), + report: invalidation_report, + }; + + // Delete the radio threshold + let mut transaction = pool.begin().await.unwrap(); + delete(&invalidated_ingest_report, &mut transaction) + .await + .unwrap(); + transaction.commit().await.unwrap(); + + // Verify the record (2) still exists + let count_after = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey_2.to_string()) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_after, 1, "Record (2) should still exists"); + + // Verify the record (1) was deleted + let count_after = sqlx::query( + r#" + SELECT COUNT(*) as count + FROM radio_threshold + WHERE hotspot_pubkey = $1 + "#, + ) + .bind(hotspot_pubkey.to_string()) + .fetch_one(&pool) + .await + .unwrap() + .get::("count"); + + assert_eq!(count_after, 0, "Record (1) should be deleted"); + } } From baf11be89ca15464d8c730e47983137b095dfc8c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 12:40:33 +0300 Subject: [PATCH 03/23] Add test_verified_radio_thresholds --- mobile_verifier/src/radio_threshold.rs | 75 ++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 9777ef965..84f9e9bad 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -667,4 +667,79 @@ mod tests { assert_eq!(count_after, 0, "Record (1) should be deleted"); } + + #[sqlx::test] + async fn test_verified_radio_thresholds(pool: PgPool) { + // Setup: Create radio thresholds with different timestamps + let now = Utc::now(); + + // Create two hotspot keypairs + let hotspot_keypair1 = generate_keypair(); + let hotspot_pubkey1 = PublicKeyBinary::from(hotspot_keypair1.public_key().to_owned()); + + let hotspot_keypair2 = generate_keypair(); + let hotspot_pubkey2 = PublicKeyBinary::from(hotspot_keypair2.public_key().to_owned()); + + let carrier_keypair = generate_keypair(); + let carrier_pubkey = PublicKeyBinary::from(carrier_keypair.public_key().to_owned()); + + // Create radio threshold reports with different timestamps + // Hotspot 1 - before the reward period end (should be verified) + let report1 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey1.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_timestamp: now - chrono::Duration::hours(5), // Before the reward period end + }; + + // Hotspot 2 - after the reward period end (should not be verified) + let report2 = RadioThresholdReportReq { + hotspot_pubkey: hotspot_pubkey2.clone(), + carrier_pub_key: carrier_pubkey.clone(), + bytes_threshold: 2000, + subscriber_threshold: 100, + threshold_timestamp: now + chrono::Duration::hours(2), // After the reward period end + }; + + // Create and save ingest reports + let ingest_report1 = RadioThresholdIngestReport { + received_timestamp: now, + report: report1, + }; + + let ingest_report2 = RadioThresholdIngestReport { + received_timestamp: now, + report: report2, + }; + + // Save both reports + let mut transaction = pool.begin().await.unwrap(); + save(&ingest_report1, &mut transaction).await.unwrap(); + save(&ingest_report2, &mut transaction).await.unwrap(); + transaction.commit().await.unwrap(); + + // Define reward period for testing + let reward_period = (now - chrono::Duration::hours(24))..now; + + // Call the function under test + let verified_thresholds = verified_radio_thresholds(&pool, &reward_period) + .await + .unwrap(); + + // Verify results + assert!( + verified_thresholds.is_verified(hotspot_pubkey1.clone()), + "Hotspot 1 should be verified (threshold timestamp before period end)" + ); + + assert!( + !verified_thresholds.is_verified(hotspot_pubkey2.clone()), + "Hotspot 2 should not be verified (threshold timestamp after period end)" + ); + + // Verify total count + let count = verified_thresholds.gateways.len(); + assert_eq!(count, 1, "Should have exactly 1 verified hotspot"); + } } From 54694b2f12eb3ba92b09f9327df3b8e00d75fc55 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 12:52:26 +0300 Subject: [PATCH 04/23] Add test_verify_legacy --- mobile_verifier/src/radio_threshold.rs | 72 ++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 84f9e9bad..b0bb28520 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -742,4 +742,76 @@ mod tests { let count = verified_thresholds.gateways.len(); assert_eq!(count, 1, "Should have exactly 1 verified hotspot"); } + + #[sqlx::test] + async fn test_verify_legacy(pool: PgPool) { + // Create a test ingestor with authorization verifier + struct MockAuthVerifier; + + #[async_trait::async_trait] + impl AuthorizationVerifier for MockAuthVerifier { + type Error = std::io::Error; + + async fn verify_authorized_key( + &self, + _public_key: &PublicKeyBinary, + _role: NetworkKeyRole, + ) -> Result { + Ok(true) + } + } + + let ingestor = RadioThresholdIngestor::new( + pool.clone(), + tokio::sync::mpsc::channel(1).1, // Reports receiver (unused) + tokio::sync::mpsc::channel(1).1, // Invalid reports receiver (unused) + FileSinkClient::new(tokio::sync::mpsc::channel(1).0, "test"), // Verified sink (unused) + FileSinkClient::new(tokio::sync::mpsc::channel(1).0, "test"), // Invalid verified sink (unused) + MockAuthVerifier, + ); + + // Create test hotspot pubkey + let hotspot_keypair = generate_keypair(); + let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); + + // Initially the hotspot should not be grandfathered + let is_legacy_before = ingestor + .verify_legacy(&hotspot_pubkey, &None) + .await + .unwrap(); + assert!( + !is_legacy_before, + "Hotspot should not be legacy before insertion" + ); + + // Insert a record into grandfathered_radio_threshold + sqlx::query( + r#" + INSERT INTO grandfathered_radio_threshold (hotspot_pubkey, cbsd_id) + VALUES ($1, NULL) + "#, + ) + .bind(hotspot_pubkey.to_string()) + .execute(&pool) + .await + .unwrap(); + + // Now the hotspot should be grandfathered + let is_legacy_after = ingestor + .verify_legacy(&hotspot_pubkey, &None) + .await + .unwrap(); + assert!(is_legacy_after, "Hotspot should be legacy after insertion"); + + // Test with a different hotspot that isn't grandfathered + let other_hotspot_keypair = generate_keypair(); + let other_hotspot_pubkey = + PublicKeyBinary::from(other_hotspot_keypair.public_key().to_owned()); + + let other_is_legacy = ingestor + .verify_legacy(&other_hotspot_pubkey, &None) + .await + .unwrap(); + assert!(!other_is_legacy, "Other hotspot should not be legacy"); + } } From 6fed9fdaf8c0cb981b13ed7e52a4c56eebc5ffbd Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 18:29:28 +0300 Subject: [PATCH 05/23] Remove verify_legacy function --- mobile_verifier/src/radio_threshold.rs | 101 +------------------------ 1 file changed, 2 insertions(+), 99 deletions(-) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index b0bb28520..2637f9092 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -25,7 +25,7 @@ use helium_proto::services::{ }, }; use mobile_config::client::authorization_client::AuthorizationVerifier; -use sqlx::{FromRow, PgPool, Pool, Postgres, Row, Transaction}; +use sqlx::{FromRow, PgPool, Pool, Postgres, Transaction}; use std::{collections::HashSet, ops::Range}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; @@ -255,16 +255,8 @@ where &self, report: &RadioThresholdReportReq, ) -> anyhow::Result { - let is_legacy = self.verify_legacy(&report.hotspot_pubkey, &None).await?; let report_validity = self.do_report_verifications(report).await; - let final_validity = if is_legacy - && report_validity == RadioThresholdReportVerificationStatus::ThresholdReportStatusValid - { - RadioThresholdReportVerificationStatus::ThresholdReportStatusLegacyValid - } else { - report_validity - }; - Ok(final_validity) + Ok(report_validity) } async fn verify_invalid_report( @@ -293,23 +285,6 @@ where .await .unwrap_or_default() } - - async fn verify_legacy( - &self, - hotspot_key: &PublicKeyBinary, - cbsd_id: &Option, - ) -> Result { - // check if the radio has been grandfathered in, meaning a radio which has received - // boosted rewards prior to the data component of hip84 going live - // if true then it is assigned a status reason of Legacy - // TODO: remove this handling after the grandfathering period - let row = sqlx::query(" select exists(select 1 from grandfathered_radio_threshold where hotspot_pubkey = $1 and (cbsd_id is null or cbsd_id = $2)) ") - .bind(hotspot_key) - .bind(cbsd_id) - .fetch_one(&self.pool) - .await?; - Ok(row.get("exists")) - } } pub async fn save( @@ -742,76 +717,4 @@ mod tests { let count = verified_thresholds.gateways.len(); assert_eq!(count, 1, "Should have exactly 1 verified hotspot"); } - - #[sqlx::test] - async fn test_verify_legacy(pool: PgPool) { - // Create a test ingestor with authorization verifier - struct MockAuthVerifier; - - #[async_trait::async_trait] - impl AuthorizationVerifier for MockAuthVerifier { - type Error = std::io::Error; - - async fn verify_authorized_key( - &self, - _public_key: &PublicKeyBinary, - _role: NetworkKeyRole, - ) -> Result { - Ok(true) - } - } - - let ingestor = RadioThresholdIngestor::new( - pool.clone(), - tokio::sync::mpsc::channel(1).1, // Reports receiver (unused) - tokio::sync::mpsc::channel(1).1, // Invalid reports receiver (unused) - FileSinkClient::new(tokio::sync::mpsc::channel(1).0, "test"), // Verified sink (unused) - FileSinkClient::new(tokio::sync::mpsc::channel(1).0, "test"), // Invalid verified sink (unused) - MockAuthVerifier, - ); - - // Create test hotspot pubkey - let hotspot_keypair = generate_keypair(); - let hotspot_pubkey = PublicKeyBinary::from(hotspot_keypair.public_key().to_owned()); - - // Initially the hotspot should not be grandfathered - let is_legacy_before = ingestor - .verify_legacy(&hotspot_pubkey, &None) - .await - .unwrap(); - assert!( - !is_legacy_before, - "Hotspot should not be legacy before insertion" - ); - - // Insert a record into grandfathered_radio_threshold - sqlx::query( - r#" - INSERT INTO grandfathered_radio_threshold (hotspot_pubkey, cbsd_id) - VALUES ($1, NULL) - "#, - ) - .bind(hotspot_pubkey.to_string()) - .execute(&pool) - .await - .unwrap(); - - // Now the hotspot should be grandfathered - let is_legacy_after = ingestor - .verify_legacy(&hotspot_pubkey, &None) - .await - .unwrap(); - assert!(is_legacy_after, "Hotspot should be legacy after insertion"); - - // Test with a different hotspot that isn't grandfathered - let other_hotspot_keypair = generate_keypair(); - let other_hotspot_pubkey = - PublicKeyBinary::from(other_hotspot_keypair.public_key().to_owned()); - - let other_is_legacy = ingestor - .verify_legacy(&other_hotspot_pubkey, &None) - .await - .unwrap(); - assert!(!other_is_legacy, "Other hotspot should not be legacy"); - } } From 31910d4cd2e375d9eece41a031046f4944435f8c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 18:46:57 +0300 Subject: [PATCH 06/23] Add migration. DROP TABLE grandfathered_radio_threshold --- .../migrations/41_drop_grandfathered_radio_threshold.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql diff --git a/mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql b/mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql new file mode 100644 index 000000000..6480b3d3d --- /dev/null +++ b/mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql @@ -0,0 +1 @@ +DROP TABLE grandfathered_radio_threshold; From 473b4137ac5f8bbbafa05114a5743264a0c2bfeb Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 21:09:18 +0300 Subject: [PATCH 07/23] DELETE cbsd_id from radio_threshold table and radio_threshold.rs module --- .../42_radio_threshold_rework_unique_idx.sql | 7 +++++++ mobile_verifier/src/radio_threshold.rs | 18 ++++-------------- 2 files changed, 11 insertions(+), 14 deletions(-) create mode 100644 mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql diff --git a/mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql b/mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql new file mode 100644 index 000000000..0ac4a22b4 --- /dev/null +++ b/mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql @@ -0,0 +1,7 @@ +DELETE FROM radio_threshold WHERE cbsd_id IS NOT NULL; + +DROP INDEX radio_threshold_hotspot_pubkey_cbsd_id_idx; + +ALTER TABLE radio_threshold DROP COLUMN cbsd_id; + +CREATE UNIQUE INDEX radio_threshold_hotspot_pubkey_idx ON radio_threshold (hotspot_pubkey); diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 2637f9092..0bdbb1247 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -291,20 +291,17 @@ pub async fn save( ingest_report: &RadioThresholdIngestReport, db: &mut Transaction<'_, Postgres>, ) -> Result<(), sqlx::Error> { - let cbsd_id: Option = None; - sqlx::query( r#" INSERT INTO radio_threshold ( hotspot_pubkey, - cbsd_id, bytes_threshold, subscriber_threshold, threshold_timestamp, threshold_met, recv_timestamp) - VALUES ($1, $2, $3, $4, $5, true, $6) - ON CONFLICT (hotspot_pubkey, COALESCE(cbsd_id, '')) + VALUES ($1, $2, $3, $4, true, $5) + ON CONFLICT (hotspot_pubkey) DO UPDATE SET bytes_threshold = EXCLUDED.bytes_threshold, subscriber_threshold = EXCLUDED.subscriber_threshold, @@ -314,7 +311,6 @@ pub async fn save( "#, ) .bind(ingest_report.report.hotspot_pubkey.to_string()) - .bind(cbsd_id) .bind(ingest_report.report.bytes_threshold as i64) .bind(ingest_report.report.subscriber_threshold as i32) .bind(ingest_report.report.threshold_timestamp) @@ -349,8 +345,7 @@ pub async fn verified_radio_thresholds( reward_period: &Range>, ) -> Result { let mut rows = sqlx::query_as::<_, RadioThreshold>( - "SELECT hotspot_pubkey - FROM radio_threshold WHERE threshold_timestamp < $1 and cbsd_id IS NULL", + "SELECT hotspot_pubkey FROM radio_threshold WHERE threshold_timestamp < $1", ) .bind(reward_period.end) .fetch(pool); @@ -367,8 +362,7 @@ pub async fn delete( ) -> Result<(), sqlx::Error> { sqlx::query( r#" - DELETE FROM radio_threshold - WHERE hotspot_pubkey = $1 AND cbsd_id is null + DELETE FROM radio_threshold WHERE hotspot_pubkey = $1 "#, ) .bind(ingest_report.report.hotspot_pubkey.to_string()) @@ -419,7 +413,6 @@ mod tests { r#" SELECT hotspot_pubkey, - cbsd_id, bytes_threshold, subscriber_threshold, threshold_timestamp, @@ -437,7 +430,6 @@ mod tests { result.get::("hotspot_pubkey"), hotspot_pubkey.to_string() ); - assert_eq!(result.get::, _>("cbsd_id"), None); assert_eq!(result.get::("bytes_threshold"), 1000); assert_eq!(result.get::("subscriber_threshold"), 50); assert!(result.get::("threshold_met")); @@ -504,7 +496,6 @@ mod tests { r#" SELECT hotspot_pubkey, - cbsd_id, bytes_threshold, subscriber_threshold, threshold_timestamp, @@ -523,7 +514,6 @@ mod tests { result.get::("hotspot_pubkey"), hotspot_pubkey.to_string() ); - assert_eq!(result.get::, _>("cbsd_id"), None); assert_eq!(result.get::("bytes_threshold"), 2000); // Updated value assert_eq!(result.get::("subscriber_threshold"), 100); // Updated value assert!(result.get::("threshold_met")); From af95af300db7d061df4ae11224de6aa8b289489d Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 31 Mar 2025 21:30:37 +0300 Subject: [PATCH 08/23] Increace migrations number --- ...io_threshold.sql => 42_drop_grandfathered_radio_threshold.sql} | 0 ...rk_unique_idx.sql => 43_radio_threshold_rework_unique_idx.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename mobile_verifier/migrations/{41_drop_grandfathered_radio_threshold.sql => 42_drop_grandfathered_radio_threshold.sql} (100%) rename mobile_verifier/migrations/{42_radio_threshold_rework_unique_idx.sql => 43_radio_threshold_rework_unique_idx.sql} (100%) diff --git a/mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql b/mobile_verifier/migrations/42_drop_grandfathered_radio_threshold.sql similarity index 100% rename from mobile_verifier/migrations/41_drop_grandfathered_radio_threshold.sql rename to mobile_verifier/migrations/42_drop_grandfathered_radio_threshold.sql diff --git a/mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql b/mobile_verifier/migrations/43_radio_threshold_rework_unique_idx.sql similarity index 100% rename from mobile_verifier/migrations/42_radio_threshold_rework_unique_idx.sql rename to mobile_verifier/migrations/43_radio_threshold_rework_unique_idx.sql From 980c64266fec09b8ed7bf8597f378f3e24f5570a Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 12:52:38 +0300 Subject: [PATCH 09/23] Don't need to DELETE FROM cbrs_heartbeats anymore --- mobile_verifier/src/heartbeats/mod.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 41540076a..0e6af25bf 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -629,11 +629,6 @@ pub async fn clear_heartbeats( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, timestamp: &DateTime, ) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM cbrs_heartbeats WHERE truncated_timestamp < $1;") - .bind(timestamp) - .execute(&mut *tx) - .await?; - sqlx::query("DELETE FROM wifi_heartbeats WHERE truncated_timestamp < $1;") .bind(timestamp) .execute(&mut *tx) From b377c8617e42503961d9760a7f144f4f6dd63347 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 13:12:49 +0300 Subject: [PATCH 10/23] Remove cbsd_id selecting from valid radios --- mobile_verifier/src/heartbeats/valid_radios.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/mobile_verifier/src/heartbeats/valid_radios.sql b/mobile_verifier/src/heartbeats/valid_radios.sql index e64af1c43..644ebfeb8 100644 --- a/mobile_verifier/src/heartbeats/valid_radios.sql +++ b/mobile_verifier/src/heartbeats/valid_radios.sql @@ -82,7 +82,6 @@ latest_uuids AS (( SELECT DISTINCT ON (hotspot_key, truncated_timestamp DESC)) SELECT hb.hotspot_key, - hb.cbsd_id, hb.cell_type, hb.distances_to_asserted, hb.trust_score_multipliers, From 9b3ee25f37ed780e7455d368f683750f50c631a9 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 13:36:27 +0300 Subject: [PATCH 11/23] Remove cbrs from valid_radios.sql --- .../src/heartbeats/valid_radios.sql | 79 ++++--------------- 1 file changed, 14 insertions(+), 65 deletions(-) diff --git a/mobile_verifier/src/heartbeats/valid_radios.sql b/mobile_verifier/src/heartbeats/valid_radios.sql index 644ebfeb8..999d7dac6 100644 --- a/mobile_verifier/src/heartbeats/valid_radios.sql +++ b/mobile_verifier/src/heartbeats/valid_radios.sql @@ -1,39 +1,4 @@ -WITH latest_cbrs_hotspot AS ( - SELECT DISTINCT ON (cbsd_id) - cbsd_id, - hotspot_key - FROM - cbrs_heartbeats - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - cbsd_id, - latest_timestamp DESC -), -heartbeats AS ( - SELECT - lch.hotspot_key, - ch.cbsd_id, - ch.cell_type, - CASE WHEN count(*) >= $3 THEN - 1.0 - ELSE - 0.0 - END AS heartbeat_multiplier, - NULL as distances_to_asserted, - ARRAY_AGG(ch.location_trust_score_multiplier) as trust_score_multipliers - FROM - cbrs_heartbeats ch - INNER JOIN latest_cbrs_hotspot lch ON ch.cbsd_id = lch.cbsd_id - WHERE - ch.truncated_timestamp >= $1 - AND ch.truncated_timestamp < $2 - GROUP BY - ch.cbsd_id, - lch.hotspot_key, - ch.cell_type - UNION +WITH heartbeats AS ( SELECT hotspot_key, NULL AS cbsd_id, @@ -54,32 +19,19 @@ heartbeats AS ( hotspot_key, cell_type ), -latest_uuids AS (( SELECT DISTINCT ON (hotspot_key, - cbsd_id) - hotspot_key, - cbsd_id, - coverage_object - FROM - cbrs_heartbeats ch - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - hotspot_key, - cbsd_id, - truncated_timestamp DESC) - UNION ( SELECT DISTINCT ON (hotspot_key) - hotspot_key, - NULL AS cbsd_id, - coverage_object - FROM - wifi_heartbeats wh - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ORDER BY - hotspot_key, - truncated_timestamp DESC)) +latest_uuids AS ( + SELECT DISTINCT ON (hotspot_key) + hotspot_key, + coverage_object + FROM + wifi_heartbeats wh + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ORDER BY + hotspot_key, + truncated_timestamp DESC +) SELECT hb.hotspot_key, hb.cell_type, @@ -89,8 +41,5 @@ SELECT FROM heartbeats hb INNER JOIN latest_uuids u ON hb.hotspot_key = u.hotspot_key - AND (hb.cbsd_id = u.cbsd_id - OR (hb.cbsd_id IS NULL - AND u.cbsd_id IS NULL)) WHERE hb.heartbeat_multiplier = 1.0 From 3b83e2ab7bfab4411efee9f522f0f5a822e6eeed Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 13:41:36 +0300 Subject: [PATCH 12/23] Add migration. DROP TABLE cbrs_heartbeats --- mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql diff --git a/mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql b/mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql new file mode 100644 index 000000000..03439ccb1 --- /dev/null +++ b/mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql @@ -0,0 +1 @@ +DROP TABLE cbrs_heartbeats; From 217c6a5d3a722c2eb2ad434f0137e0dcf89427bb Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 13:48:18 +0300 Subject: [PATCH 13/23] Remove cell_type from struct HeartbeatReward --- mobile_verifier/src/heartbeats/mod.rs | 11 +++-------- mobile_verifier/src/reward_shares.rs | 8 -------- mobile_verifier/tests/integrations/heartbeats.rs | 4 ---- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 0e6af25bf..61afe3a54 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -2,13 +2,12 @@ pub mod last_location; pub mod wifi; use crate::{ - cell_type::{CellType, CellTypeLabel}, + cell_type::CellType, coverage::{self, CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta}, geofence::GeofenceValidator, seniority::{Seniority, SeniorityUpdate}, GatewayResolution, GatewayResolver, }; -use anyhow::anyhow; use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; use file_store::{file_sink::FileSinkClient, wifi_heartbeat::WifiHeartbeatIngestReport}; use futures::stream::{Stream, StreamExt}; @@ -205,7 +204,6 @@ impl From for Heartbeat { #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct HeartbeatReward { pub hotspot_key: PublicKeyBinary, - pub cell_type: CellType, pub distances_to_asserted: Option>, pub trust_score_multipliers: Vec, pub coverage_object: Uuid, @@ -216,11 +214,8 @@ impl HeartbeatReward { KeyType::Wifi(&self.hotspot_key) } - pub fn id(&self) -> anyhow::Result { - match self.cell_type.to_label() { - CellTypeLabel::Wifi => Ok(self.hotspot_key.to_string()), - _ => Err(anyhow!("failed to derive label from cell type")), - } + pub fn id(&self) -> String { + self.hotspot_key.to_string() } pub fn validated<'a>( diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 4b0c0d715..f54a8fc4c 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -726,7 +726,6 @@ mod test { use hex_assignments::{assignment::HexAssignments, Assignment}; use crate::{ - cell_type::CellType, coverage::{CoveredHexStream, HexCoverage}, data_session::{self, HotspotDataSession, HotspotReward}, heartbeats::{HeartbeatReward, KeyType, OwnedKeyType}, @@ -1180,7 +1179,6 @@ mod test { let heartbeat_rewards = vec![HeartbeatReward { hotspot_key: gw1.clone(), coverage_object: cov_obj_1, - cell_type: CellType::NovaGenericWifiOutdoor, distances_to_asserted: None, trust_score_multipliers: vec![dec!(1.0)], }] @@ -1311,28 +1309,24 @@ mod test { HeartbeatReward { hotspot_key: gw10.clone(), coverage_object: cov_obj_10, - cell_type: CellType::NovaGenericWifiIndoor, distances_to_asserted: Some(vec![0]), trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw20.clone(), coverage_object: cov_obj_20, - cell_type: CellType::NovaGenericWifiIndoor, distances_to_asserted: Some(vec![0, 250, 250]), trust_score_multipliers: vec![dec!(1.0), dec!(0.25), dec!(0.25)], }, HeartbeatReward { hotspot_key: gw21.clone(), coverage_object: cov_obj_21, - cell_type: CellType::NovaGenericWifiIndoor, distances_to_asserted: Some(vec![0]), trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw30.clone(), coverage_object: cov_obj_30, - cell_type: CellType::NovaGenericWifiIndoor, distances_to_asserted: Some(vec![0]), trust_score_multipliers: vec![dec!(1.0)], }, @@ -1492,7 +1486,6 @@ mod test { // add qualified wifi indoor HB HeartbeatReward { hotspot_key: gw1.clone(), - cell_type: CellType::NovaGenericWifiOutdoor, coverage_object: g1_cov_obj, distances_to_asserted: Some(vec![0]), trust_score_multipliers: vec![dec!(1.0)], @@ -1500,7 +1493,6 @@ mod test { // add unqualified wifi indoor HB HeartbeatReward { hotspot_key: gw2.clone(), - cell_type: CellType::NovaGenericWifiOutdoor, coverage_object: g2_cov_obj, distances_to_asserted: None, trust_score_multipliers: vec![dec!(1.0)], diff --git a/mobile_verifier/tests/integrations/heartbeats.rs b/mobile_verifier/tests/integrations/heartbeats.rs index a64a61b67..a336c1a64 100644 --- a/mobile_verifier/tests/integrations/heartbeats.rs +++ b/mobile_verifier/tests/integrations/heartbeats.rs @@ -51,7 +51,6 @@ async fn test_save_wifi_heartbeat(pool: PgPool) -> anyhow::Result<()> { #[sqlx::test] async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> { let coverage_object = Uuid::new_v4(); - let cell_type = CellType::NovaGenericWifiIndoor; let hotspot_1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; let hotspot_2: PublicKeyBinary = @@ -102,7 +101,6 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot_2, - cell_type, trust_score_multipliers: vec![Decimal::ONE; 13], distances_to_asserted: Some(vec![0; 13]), coverage_object, @@ -191,7 +189,6 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot, - cell_type: CellType::NovaGenericWifiIndoor, trust_score_multipliers: vec![Decimal::ONE; 12], distances_to_asserted: Some(vec![0; 12]), coverage_object: latest_coverage_object, @@ -241,7 +238,6 @@ VALUES heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot, - cell_type: CellType::NovaGenericWifiIndoor, trust_score_multipliers: vec![ dec!(1.0), dec!(1.0), From e321bb3e07b261a8410cf15fc3dbf99c5bde2c74 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 14:42:50 +0300 Subject: [PATCH 14/23] Remove redundant cbsd_id from valid_radio.sql --- mobile_verifier/src/heartbeats/valid_radios.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/mobile_verifier/src/heartbeats/valid_radios.sql b/mobile_verifier/src/heartbeats/valid_radios.sql index 999d7dac6..aebcc70f1 100644 --- a/mobile_verifier/src/heartbeats/valid_radios.sql +++ b/mobile_verifier/src/heartbeats/valid_radios.sql @@ -1,7 +1,6 @@ WITH heartbeats AS ( SELECT hotspot_key, - NULL AS cbsd_id, cell_type, CASE WHEN count(*) >= $3 THEN 1.0 From b4bcc679c0e274ab0f7dcd91617d572ddfa6a6bf Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 15:20:34 +0300 Subject: [PATCH 15/23] Make distances_to_asserted not optional in HeartbeatReward --- mobile_verifier/src/heartbeats/mod.rs | 54 ++++++++++++++++--- mobile_verifier/src/reward_shares.rs | 16 +++--- .../tests/integrations/heartbeats.rs | 6 +-- 3 files changed, 57 insertions(+), 19 deletions(-) diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 61afe3a54..d2be62810 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -8,6 +8,7 @@ use crate::{ seniority::{Seniority, SeniorityUpdate}, GatewayResolution, GatewayResolver, }; +use anyhow::anyhow; use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; use file_store::{file_sink::FileSinkClient, wifi_heartbeat::WifiHeartbeatIngestReport}; use futures::stream::{Stream, StreamExt}; @@ -204,7 +205,7 @@ impl From for Heartbeat { #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct HeartbeatReward { pub hotspot_key: PublicKeyBinary, - pub distances_to_asserted: Option>, + pub distances_to_asserted: Vec, pub trust_score_multipliers: Vec, pub coverage_object: Uuid, } @@ -229,16 +230,21 @@ impl HeartbeatReward { .fetch(exec) } - pub fn iter_distances_and_scores(&self) -> impl Iterator { - let fallback: Vec = std::iter::repeat(0) - .take(self.trust_score_multipliers.len()) - .collect(); + pub fn iter_distances_and_scores( + &self, + ) -> anyhow::Result> { + // This should never happen if valid_radio.sql is not touched + if self.trust_score_multipliers.len() != self.distances_to_asserted.len() { + return Err(anyhow!( + "Mismatched lengths between distances_to_asserted and trust_score_multipliers" + )); + } - self.distances_to_asserted + Ok(self + .distances_to_asserted .clone() - .unwrap_or(fallback) .into_iter() - .zip(self.trust_score_multipliers.clone()) + .zip(self.trust_score_multipliers.clone())) } } @@ -640,6 +646,38 @@ mod test { use file_store::wifi_heartbeat::WifiHeartbeat; use proto::SeniorityUpdateReason::*; + #[test] + fn test_iter_distances_and_scores_with_matching_lengths() { + let reward = HeartbeatReward { + hotspot_key: PublicKeyBinary::from(vec![1, 2, 3]), + distances_to_asserted: vec![10, 20, 30], + trust_score_multipliers: vec![dec!(0.25), dec!(0.5), dec!(1.0)], + coverage_object: uuid::Uuid::new_v4(), + }; + + let result = reward.iter_distances_and_scores(); + assert!(result.is_ok(), "Expected successful iteration"); + + let pairs: Vec<(i64, Decimal)> = result.unwrap().collect(); + assert_eq!(pairs.len(), 3, "Expected 3 pairs in the iterator"); + assert_eq!(pairs[0], (10, dec!(0.25))); + assert_eq!(pairs[1], (20, dec!(0.5))); + assert_eq!(pairs[2], (30, dec!(1.0))); + } + + #[test] + fn test_iter_distances_and_scores_with_mismatched_lengths() { + let reward = HeartbeatReward { + hotspot_key: PublicKeyBinary::from(vec![1, 2, 3]), + distances_to_asserted: vec![10, 20], // Only 2 elements + trust_score_multipliers: vec![dec!(0.25), dec!(0.5), dec!(1.0)], // 3 elements + coverage_object: uuid::Uuid::new_v4(), + }; + + let result = reward.iter_distances_and_scores(); + assert!(result.is_err(), "Expected error due to mismatched lengths"); + } + fn heartbeat(timestamp: DateTime, coverage_object: Uuid) -> ValidatedHeartbeat { ValidatedHeartbeat { cell_type: CellType::CellTypeNone, diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index f54a8fc4c..19d13dc57 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -381,7 +381,7 @@ impl CoverageShares { .await?; let trust_scores: Vec = heartbeat - .iter_distances_and_scores() + .iter_distances_and_scores()? .map(|(distance, trust_score)| LocationTrust { meters_to_asserted: distance as u32, trust_score, @@ -1179,7 +1179,7 @@ mod test { let heartbeat_rewards = vec![HeartbeatReward { hotspot_key: gw1.clone(), coverage_object: cov_obj_1, - distances_to_asserted: None, + distances_to_asserted: vec![1], trust_score_multipliers: vec![dec!(1.0)], }] .into_iter() @@ -1309,25 +1309,25 @@ mod test { HeartbeatReward { hotspot_key: gw10.clone(), coverage_object: cov_obj_10, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![1], trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw20.clone(), coverage_object: cov_obj_20, - distances_to_asserted: Some(vec![0, 250, 250]), + distances_to_asserted: vec![0, 250, 250], trust_score_multipliers: vec![dec!(1.0), dec!(0.25), dec!(0.25)], }, HeartbeatReward { hotspot_key: gw21.clone(), coverage_object: cov_obj_21, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, HeartbeatReward { hotspot_key: gw30.clone(), coverage_object: cov_obj_30, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, ] @@ -1487,14 +1487,14 @@ mod test { HeartbeatReward { hotspot_key: gw1.clone(), coverage_object: g1_cov_obj, - distances_to_asserted: Some(vec![0]), + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, // add unqualified wifi indoor HB HeartbeatReward { hotspot_key: gw2.clone(), coverage_object: g2_cov_obj, - distances_to_asserted: None, + distances_to_asserted: vec![0], trust_score_multipliers: vec![dec!(1.0)], }, ] diff --git a/mobile_verifier/tests/integrations/heartbeats.rs b/mobile_verifier/tests/integrations/heartbeats.rs index a336c1a64..9b48acd55 100644 --- a/mobile_verifier/tests/integrations/heartbeats.rs +++ b/mobile_verifier/tests/integrations/heartbeats.rs @@ -102,7 +102,7 @@ VALUES vec![HeartbeatReward { hotspot_key: hotspot_2, trust_score_multipliers: vec![Decimal::ONE; 13], - distances_to_asserted: Some(vec![0; 13]), + distances_to_asserted: vec![0; 13], coverage_object, }] ); @@ -190,7 +190,7 @@ VALUES vec![HeartbeatReward { hotspot_key: hotspot, trust_score_multipliers: vec![Decimal::ONE; 12], - distances_to_asserted: Some(vec![0; 12]), + distances_to_asserted: vec![0; 12], coverage_object: latest_coverage_object, }] ); @@ -252,7 +252,7 @@ VALUES dec!(0.25), dec!(0.25) ], - distances_to_asserted: Some(vec![0; 12]), + distances_to_asserted: vec![0; 12], coverage_object: latest_coverage_object, }] ); From bb0b09747af51d0dae285995561fb205ca072bdb Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 16:22:53 +0300 Subject: [PATCH 16/23] Add migration: DROP TABLE old_hex_coverage --- mobile_verifier/migrations/45_drop_old_hex_coverage.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_verifier/migrations/45_drop_old_hex_coverage.sql diff --git a/mobile_verifier/migrations/45_drop_old_hex_coverage.sql b/mobile_verifier/migrations/45_drop_old_hex_coverage.sql new file mode 100644 index 000000000..c40b8493d --- /dev/null +++ b/mobile_verifier/migrations/45_drop_old_hex_coverage.sql @@ -0,0 +1 @@ +DROP TABLE old_hex_coverage; From 5d4e992beaccdcd2e5885fb8b6c249a06399b178 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 18:30:54 +0300 Subject: [PATCH 17/23] Add migration. DELETE FROM seniority WHERE radio_type = 'cbrs' --- mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql diff --git a/mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql b/mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql new file mode 100644 index 000000000..c93a25ec9 --- /dev/null +++ b/mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql @@ -0,0 +1 @@ +DELETE FROM seniority WHERE radio_type = 'cbrs'; From 4136a9da466a2ed517259a547a0bbad4a6f737a4 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 1 Apr 2025 18:38:34 +0300 Subject: [PATCH 18/23] Add migration. DELETE FROM coverage_objects WHERE radio_type = 'cbrs' --- .../migrations/47_delete_cbrs_from_coverage_objects.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql diff --git a/mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql b/mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql new file mode 100644 index 000000000..ddf295ddb --- /dev/null +++ b/mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql @@ -0,0 +1 @@ +DELETE FROM coverage_objects WHERE radio_type = 'cbrs'; From d35b68d1ebf3c27c5142c3f89ee655578d082e39 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 3 Apr 2025 12:06:33 +0300 Subject: [PATCH 19/23] verify_report can no longer fail --- mobile_verifier/src/radio_threshold.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 0bdbb1247..bc4a2cab1 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -175,7 +175,7 @@ where .map(anyhow::Ok) .try_fold(transaction, |mut transaction, ingest_report| async move { // verify the report - let verified_report_status = self.verify_report(&ingest_report.report).await?; + let verified_report_status = self.verify_report(&ingest_report.report).await; // if the report is valid then save to the db // and thus available to the rewarder @@ -254,9 +254,8 @@ where async fn verify_report( &self, report: &RadioThresholdReportReq, - ) -> anyhow::Result { - let report_validity = self.do_report_verifications(report).await; - Ok(report_validity) + ) -> RadioThresholdReportVerificationStatus { + self.do_report_verifications(report).await } async fn verify_invalid_report( From 9101622d27c46ac4d34c4a499b8ad4624c7d471f Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 3 Apr 2025 12:53:18 +0300 Subject: [PATCH 20/23] Refactor fn verified_radio_thresholds. Get rid of RadioThreshold --- mobile_verifier/src/radio_threshold.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index bc4a2cab1..3277127f2 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -25,7 +25,7 @@ use helium_proto::services::{ }, }; use mobile_config::client::authorization_client::AuthorizationVerifier; -use sqlx::{FromRow, PgPool, Pool, Postgres, Transaction}; +use sqlx::{PgPool, Pool, Postgres, Transaction}; use std::{collections::HashSet, ops::Range}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; @@ -319,11 +319,6 @@ pub async fn save( Ok(()) } -#[derive(FromRow, Debug)] -pub struct RadioThreshold { - hotspot_pubkey: PublicKeyBinary, -} - #[derive(Debug, Clone, Default)] pub struct VerifiedRadioThresholds { gateways: HashSet, @@ -343,16 +338,16 @@ pub async fn verified_radio_thresholds( pool: &sqlx::Pool, reward_period: &Range>, ) -> Result { - let mut rows = sqlx::query_as::<_, RadioThreshold>( + let gateways = sqlx::query_scalar::<_, PublicKeyBinary>( "SELECT hotspot_pubkey FROM radio_threshold WHERE threshold_timestamp < $1", ) .bind(reward_period.end) - .fetch(pool); - let mut map = VerifiedRadioThresholds::default(); - while let Some(row) = rows.try_next().await? { - map.insert(row.hotspot_pubkey); - } - Ok(map) + .fetch_all(pool) + .await? + .into_iter() + .collect::>(); + + Ok(VerifiedRadioThresholds { gateways }) } pub async fn delete( From 1ab0b25596fc4330c0c4e1c0888965a20949b504 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 3 Apr 2025 13:02:48 +0300 Subject: [PATCH 21/23] up migrations --- ...er_mapping_activity.sql => 43_subscriber_mapping_activity.sql} | 0 ...rk_unique_idx.sql => 44_radio_threshold_rework_unique_idx.sql} | 0 .../{44_drop_cbrs_heartbeats.sql => 45_drop_cbrs_heartbeats.sql} | 0 ...{45_drop_old_hex_coverage.sql => 46_drop_old_hex_coverage.sql} | 0 ..._cbrs_from_seniority.sql => 47_delete_cbrs_from_seniority.sql} | 0 ...erage_objects.sql => 48_delete_cbrs_from_coverage_objects.sql} | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename mobile_verifier/migrations/{42_subscriber_mapping_activity.sql => 43_subscriber_mapping_activity.sql} (100%) rename mobile_verifier/migrations/{43_radio_threshold_rework_unique_idx.sql => 44_radio_threshold_rework_unique_idx.sql} (100%) rename mobile_verifier/migrations/{44_drop_cbrs_heartbeats.sql => 45_drop_cbrs_heartbeats.sql} (100%) rename mobile_verifier/migrations/{45_drop_old_hex_coverage.sql => 46_drop_old_hex_coverage.sql} (100%) rename mobile_verifier/migrations/{46_delete_cbrs_from_seniority.sql => 47_delete_cbrs_from_seniority.sql} (100%) rename mobile_verifier/migrations/{47_delete_cbrs_from_coverage_objects.sql => 48_delete_cbrs_from_coverage_objects.sql} (100%) diff --git a/mobile_verifier/migrations/42_subscriber_mapping_activity.sql b/mobile_verifier/migrations/43_subscriber_mapping_activity.sql similarity index 100% rename from mobile_verifier/migrations/42_subscriber_mapping_activity.sql rename to mobile_verifier/migrations/43_subscriber_mapping_activity.sql diff --git a/mobile_verifier/migrations/43_radio_threshold_rework_unique_idx.sql b/mobile_verifier/migrations/44_radio_threshold_rework_unique_idx.sql similarity index 100% rename from mobile_verifier/migrations/43_radio_threshold_rework_unique_idx.sql rename to mobile_verifier/migrations/44_radio_threshold_rework_unique_idx.sql diff --git a/mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql b/mobile_verifier/migrations/45_drop_cbrs_heartbeats.sql similarity index 100% rename from mobile_verifier/migrations/44_drop_cbrs_heartbeats.sql rename to mobile_verifier/migrations/45_drop_cbrs_heartbeats.sql diff --git a/mobile_verifier/migrations/45_drop_old_hex_coverage.sql b/mobile_verifier/migrations/46_drop_old_hex_coverage.sql similarity index 100% rename from mobile_verifier/migrations/45_drop_old_hex_coverage.sql rename to mobile_verifier/migrations/46_drop_old_hex_coverage.sql diff --git a/mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql b/mobile_verifier/migrations/47_delete_cbrs_from_seniority.sql similarity index 100% rename from mobile_verifier/migrations/46_delete_cbrs_from_seniority.sql rename to mobile_verifier/migrations/47_delete_cbrs_from_seniority.sql diff --git a/mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql b/mobile_verifier/migrations/48_delete_cbrs_from_coverage_objects.sql similarity index 100% rename from mobile_verifier/migrations/47_delete_cbrs_from_coverage_objects.sql rename to mobile_verifier/migrations/48_delete_cbrs_from_coverage_objects.sql From f6f9fb89b76460242722efa3fd09d473ba6941a9 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 3 Apr 2025 14:27:46 +0300 Subject: [PATCH 22/23] Refactor tests --- mobile_verifier/src/radio_threshold.rs | 87 ++++++++++++++++---------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 3277127f2..ef5d62d34 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -372,7 +372,7 @@ mod tests { use file_store::mobile_radio_threshold::{RadioThresholdIngestReport, RadioThresholdReportReq}; use helium_crypto::{KeyTag, Keypair}; use rand::rngs::OsRng; - use sqlx::Row; + use sqlx::{prelude::FromRow, Row}; fn generate_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut OsRng) @@ -403,11 +403,20 @@ mod tests { save(&ingest_report, &mut transaction).await.unwrap(); transaction.commit().await.unwrap(); - let result = sqlx::query( + #[derive(Debug, FromRow, PartialEq)] + struct TestRadioThreshold { + hotspot_pubkey: PublicKeyBinary, + bytes_threshold: i64, + subscriber_threshold: i32, + threshold_met: bool, + threshold_timestamp: DateTime, + } + + let result = sqlx::query_as::<_, TestRadioThreshold>( r#" - SELECT - hotspot_pubkey, - bytes_threshold, + SELECT + hotspot_pubkey, + bytes_threshold, subscriber_threshold, threshold_timestamp, threshold_met @@ -415,23 +424,25 @@ mod tests { WHERE hotspot_pubkey = $1 "#, ) - .bind(hotspot_pubkey.to_string()) + .bind(&hotspot_pubkey) .fetch_one(&pool) .await .unwrap(); - assert_eq!( - result.get::("hotspot_pubkey"), - hotspot_pubkey.to_string() - ); - assert_eq!(result.get::("bytes_threshold"), 1000); - assert_eq!(result.get::("subscriber_threshold"), 50); - assert!(result.get::("threshold_met")); + pub fn nanos_trunc(ts: DateTime) -> DateTime { + use chrono::{Duration, DurationRound}; + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() + } - let stored_threshold_timestamp = result.get::, _>("threshold_timestamp"); assert_eq!( - stored_threshold_timestamp.timestamp_millis(), - now.timestamp_millis() + result, + TestRadioThreshold { + hotspot_pubkey, + bytes_threshold: 1000, + subscriber_threshold: 50, + threshold_met: true, + threshold_timestamp: nanos_trunc(now) + } ); } @@ -485,12 +496,20 @@ mod tests { .unwrap(); transaction.commit().await.unwrap(); - // Verify - Query the database to confirm the record was updated - let result = sqlx::query( + #[derive(Debug, FromRow, PartialEq)] + struct TestRadioThreshold { + hotspot_pubkey: PublicKeyBinary, + bytes_threshold: i64, + subscriber_threshold: i32, + threshold_met: bool, + threshold_timestamp: DateTime, + } + + let result = sqlx::query_as::<_, TestRadioThreshold>( r#" - SELECT - hotspot_pubkey, - bytes_threshold, + SELECT + hotspot_pubkey, + bytes_threshold, subscriber_threshold, threshold_timestamp, threshold_met @@ -498,23 +517,25 @@ mod tests { WHERE hotspot_pubkey = $1 "#, ) - .bind(hotspot_pubkey.to_string()) + .bind(&hotspot_pubkey) .fetch_one(&pool) .await .unwrap(); - // Assertions - should have the updated values - assert_eq!( - result.get::("hotspot_pubkey"), - hotspot_pubkey.to_string() - ); - assert_eq!(result.get::("bytes_threshold"), 2000); // Updated value - assert_eq!(result.get::("subscriber_threshold"), 100); // Updated value - assert!(result.get::("threshold_met")); - let stored_threshold_timestamp = result.get::, _>("threshold_timestamp"); + pub fn nanos_trunc(ts: DateTime) -> DateTime { + use chrono::{Duration, DurationRound}; + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() + } + assert_eq!( - stored_threshold_timestamp.timestamp_millis(), - updated_now.timestamp_millis() + result, + TestRadioThreshold { + hotspot_pubkey, + bytes_threshold: 2000, + subscriber_threshold: 100, + threshold_met: true, + threshold_timestamp: nanos_trunc(updated_now) + } ); } From e6ae2fc1a925f4135e2eab2cfaf157869754f596 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 21 Apr 2025 18:26:56 +0300 Subject: [PATCH 23/23] Rework migrations --- ...er_mapping_activity.sql => 42_subscriber_mapping_activity.sql} | 0 ...io_threshold.sql => 45_drop_grandfathered_radio_threshold.sql} | 0 ...rk_unique_idx.sql => 46_radio_threshold_rework_unique_idx.sql} | 0 .../{45_drop_cbrs_heartbeats.sql => 47_drop_cbrs_heartbeats.sql} | 0 ...{46_drop_old_hex_coverage.sql => 48_drop_old_hex_coverage.sql} | 0 ..._cbrs_from_seniority.sql => 49_delete_cbrs_from_seniority.sql} | 0 ...erage_objects.sql => 50_delete_cbrs_from_coverage_objects.sql} | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename mobile_verifier/migrations/{43_subscriber_mapping_activity.sql => 42_subscriber_mapping_activity.sql} (100%) rename mobile_verifier/migrations/{42_drop_grandfathered_radio_threshold.sql => 45_drop_grandfathered_radio_threshold.sql} (100%) rename mobile_verifier/migrations/{44_radio_threshold_rework_unique_idx.sql => 46_radio_threshold_rework_unique_idx.sql} (100%) rename mobile_verifier/migrations/{45_drop_cbrs_heartbeats.sql => 47_drop_cbrs_heartbeats.sql} (100%) rename mobile_verifier/migrations/{46_drop_old_hex_coverage.sql => 48_drop_old_hex_coverage.sql} (100%) rename mobile_verifier/migrations/{47_delete_cbrs_from_seniority.sql => 49_delete_cbrs_from_seniority.sql} (100%) rename mobile_verifier/migrations/{48_delete_cbrs_from_coverage_objects.sql => 50_delete_cbrs_from_coverage_objects.sql} (100%) diff --git a/mobile_verifier/migrations/43_subscriber_mapping_activity.sql b/mobile_verifier/migrations/42_subscriber_mapping_activity.sql similarity index 100% rename from mobile_verifier/migrations/43_subscriber_mapping_activity.sql rename to mobile_verifier/migrations/42_subscriber_mapping_activity.sql diff --git a/mobile_verifier/migrations/42_drop_grandfathered_radio_threshold.sql b/mobile_verifier/migrations/45_drop_grandfathered_radio_threshold.sql similarity index 100% rename from mobile_verifier/migrations/42_drop_grandfathered_radio_threshold.sql rename to mobile_verifier/migrations/45_drop_grandfathered_radio_threshold.sql diff --git a/mobile_verifier/migrations/44_radio_threshold_rework_unique_idx.sql b/mobile_verifier/migrations/46_radio_threshold_rework_unique_idx.sql similarity index 100% rename from mobile_verifier/migrations/44_radio_threshold_rework_unique_idx.sql rename to mobile_verifier/migrations/46_radio_threshold_rework_unique_idx.sql diff --git a/mobile_verifier/migrations/45_drop_cbrs_heartbeats.sql b/mobile_verifier/migrations/47_drop_cbrs_heartbeats.sql similarity index 100% rename from mobile_verifier/migrations/45_drop_cbrs_heartbeats.sql rename to mobile_verifier/migrations/47_drop_cbrs_heartbeats.sql diff --git a/mobile_verifier/migrations/46_drop_old_hex_coverage.sql b/mobile_verifier/migrations/48_drop_old_hex_coverage.sql similarity index 100% rename from mobile_verifier/migrations/46_drop_old_hex_coverage.sql rename to mobile_verifier/migrations/48_drop_old_hex_coverage.sql diff --git a/mobile_verifier/migrations/47_delete_cbrs_from_seniority.sql b/mobile_verifier/migrations/49_delete_cbrs_from_seniority.sql similarity index 100% rename from mobile_verifier/migrations/47_delete_cbrs_from_seniority.sql rename to mobile_verifier/migrations/49_delete_cbrs_from_seniority.sql diff --git a/mobile_verifier/migrations/48_delete_cbrs_from_coverage_objects.sql b/mobile_verifier/migrations/50_delete_cbrs_from_coverage_objects.sql similarity index 100% rename from mobile_verifier/migrations/48_delete_cbrs_from_coverage_objects.sql rename to mobile_verifier/migrations/50_delete_cbrs_from_coverage_objects.sql