From 109a82becb5e6c5c778b69e3c322ba453a8e8c15 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 25 Nov 2024 13:50:09 -0500 Subject: [PATCH 1/4] Track when data for mobile radio changes --- Cargo.lock | 2 + mobile_config/Cargo.toml | 3 + .../migrations/7_mobile_radio_tracker.sql | 7 + mobile_config/src/lib.rs | 1 + mobile_config/src/main.rs | 7 +- mobile_config/src/mobile_radio_tracker.rs | 253 ++++++++++++++++++ mobile_config/src/settings.rs | 9 + 7 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 mobile_config/migrations/7_mobile_radio_tracker.sql create mode 100644 mobile_config/src/mobile_radio_tracker.rs diff --git a/Cargo.lock b/Cargo.lock index e12c77c65..536f350ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5022,6 +5022,7 @@ dependencies = [ "anyhow", "async-trait", "base64 0.21.7", + "blake3", "bs58 0.4.0", "chrono", "clap 4.4.8", @@ -5037,6 +5038,7 @@ dependencies = [ "hextree", "http 0.2.11", "http-serde", + "humantime", "humantime-serde", "lazy_static", "metrics", diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index f413f4cba..54aea61d1 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true anyhow = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } +blake3 = { workspace = true } bs58 = { workspace = true } chrono = { workspace = true } clap = { workspace = true } @@ -20,6 +21,8 @@ futures = { workspace = true } futures-util = { workspace = true } helium-crypto = { workspace = true, features = ["sqlx-postgres"] } helium-proto = { workspace = true } +humantime = { workspace = true } +humantime-serde = { workspace = true } hextree = { workspace = true } http = { workspace = true } http-serde = { workspace = true } diff --git a/mobile_config/migrations/7_mobile_radio_tracker.sql b/mobile_config/migrations/7_mobile_radio_tracker.sql new file mode 100644 index 000000000..d2560347e --- /dev/null +++ b/mobile_config/migrations/7_mobile_radio_tracker.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS mobile_radio_tracker ( + entity_key BYTEA NOT NULL, + hash TEXT NOT NULL, + last_changed_at TIMESTAMPTZ NOT NULL, + last_checked_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (entity_key) +); diff --git a/mobile_config/src/lib.rs b/mobile_config/src/lib.rs index cac265fd5..964086a14 100644 --- a/mobile_config/src/lib.rs +++ b/mobile_config/src/lib.rs @@ -15,6 +15,7 @@ pub mod gateway_service; pub mod hex_boosting_service; pub mod key_cache; +pub mod mobile_radio_tracker; pub mod settings; pub mod telemetry; diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index e81127ab0..2128851a3 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -10,7 +10,7 @@ use mobile_config::{ admin_service::AdminService, authorization_service::AuthorizationService, carrier_service::CarrierService, entity_service::EntityService, gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache, - settings::Settings, + mobile_radio_tracker::MobileRadioTracker, settings::Settings, }; use std::{net::SocketAddr, path::PathBuf, time::Duration}; use task_manager::{ManagedTask, TaskManager}; @@ -108,6 +108,11 @@ impl Daemon { TaskManager::builder() .add_task(grpc_server) + .add_task(MobileRadioTracker::new( + pool.clone(), + metadata_pool.clone(), + settings.mobile_radio_tracker_interval, + )) .build() .start() .await diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs new file mode 100644 index 000000000..2d7544f4d --- /dev/null +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -0,0 +1,253 @@ +use std::{collections::HashMap, time::Duration}; + +use chrono::{DateTime, Utc}; +use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use sqlx::{Pool, Postgres, QueryBuilder}; +use task_manager::ManagedTask; + +type EntityKey = Vec; + +#[derive(Debug, sqlx::FromRow)] +struct MobileRadio { + entity_key: EntityKey, + refreshed_at: DateTime, + location: Option, + is_full_hotspot: Option, + num_location_asserts: Option, + is_active: Option, + dc_onboarding_fee_paid: Option, + device_type: String, + deployment_info: Option, +} + +impl MobileRadio { + fn hash(&self) -> String { + let mut hasher = blake3::Hasher::new(); + hasher.update( + self.location + .map(|l| l.to_le_bytes()) + .unwrap_or([0_u8; 8]) + .as_ref(), + ); + + hasher.update( + self.is_full_hotspot + .map(|l| l.to_le_bytes()) + .unwrap_or([0_u8; 4]) + .as_ref(), + ); + + hasher.update( + self.num_location_asserts + .map(|l| l.to_le_bytes()) + .unwrap_or([0_u8; 4]) + .as_ref(), + ); + + hasher.update( + self.is_active + .map(|l| l.to_le_bytes()) + .unwrap_or([0_u8; 4]) + .as_ref(), + ); + + hasher.update( + self.dc_onboarding_fee_paid + .map(|l| l.to_le_bytes()) + .unwrap_or([0_u8; 8]) + .as_ref(), + ); + + hasher.update(self.device_type.as_ref()); + + hasher.update( + self.deployment_info + .clone() + .unwrap_or("".to_string()) + .as_ref(), + ); + + hasher.finalize().to_string() + } +} + +#[derive(Debug, sqlx::FromRow)] +struct TrackedMobileRadio { + entity_key: EntityKey, + hash: String, + last_changed_at: DateTime, + last_checked_at: DateTime, +} + +impl TrackedMobileRadio { + fn new(radio: &MobileRadio) -> Self { + Self { + entity_key: radio.entity_key.clone(), + hash: radio.hash(), + last_changed_at: radio.refreshed_at, + last_checked_at: Utc::now(), + } + } + + fn update_from_radio(mut self, radio: &MobileRadio) -> Self { + let new_hash = radio.hash(); + if self.hash != new_hash { + self.hash = new_hash; + self.last_changed_at = radio.refreshed_at; + } + + self.last_checked_at = Utc::now(); + self + } +} + +pub struct MobileRadioTracker { + pool: Pool, + metadata: Pool, + interval: Duration, +} + +impl ManagedTask for MobileRadioTracker { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl MobileRadioTracker { + pub fn new(pool: Pool, metadata: Pool, interval: Duration) -> Self { + Self { + pool, + metadata, + interval, + } + } + + async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting MobileRadioTracker"); + let mut interval = tokio::time::interval(self.interval); + + loop { + tokio::select! { + biased; + _ = &mut shutdown => break, + _ = interval.tick() => { + //TODO probably shouldn't crash api when this fails + track_changes(&self.pool, &self.metadata).await?; + } + } + } + + Ok(()) + } +} + +async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { + let tracked_radios = get_tracked_radios(pool).await?; + + let updates: Vec = get_all_mobile_radios(metadata) + .scan(tracked_radios, |tracked, radio| { + let tracked_radio_opt = tracked.remove(&radio.entity_key); + async move { Some((radio, tracked_radio_opt)) } + }) + .map(|(radio, tracked_radio_opt)| match tracked_radio_opt { + Some(tracked_radio) => tracked_radio.update_from_radio(&radio), + None => TrackedMobileRadio::new(&radio), + }) + .collect() + .await; + + update_tracked_radios(pool, updates).await?; + + Ok(()) +} + +async fn get_tracked_radios( + pool: &Pool, +) -> anyhow::Result> { + sqlx::query_as::<_, TrackedMobileRadio>( + r#" + SELECT + entity_key, + hash, + last_changed_at, + last_checked_at + FROM mobile_radio_tracker + "#, + ) + .fetch(pool) + .try_fold(HashMap::new(), |mut map, tracked_radio| async move { + map.insert(tracked_radio.entity_key.clone(), tracked_radio); + Ok(map) + }) + .map_err(anyhow::Error::from) + .await +} + +fn get_all_mobile_radios<'a>(metadata: &'a Pool) -> impl Stream + 'a { + sqlx::query_as::<_, MobileRadio>( + r#" + SELECT + kta.entity_key, + kta.refreshed_at, + mhi.location::bigint, + mhi.is_full_hotspot, + mhi.num_location_asserts, + mhi.is_active, + mhi.dc_onboarding_fee_paid::bigint, + mhi.device_type, + mhi.deployment_info + FROM key_to_assets kta + INNER JOIN mobile_hotspot_infos mhi ON + kta.asset = mhi.asset + WHERE kta.entity_key IS NOT NULL + AND mhi.refreshed_at IS NOT NULL + "#, + ) + .fetch(metadata) + .filter_map(|result| async move { result.ok() }) + .boxed() +} + +async fn update_tracked_radios( + pool: &Pool, + tracked_radios: Vec, +) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + const BATCH_SIZE: usize = (u16::MAX / 4) as usize; + + for chunk in tracked_radios.chunks(BATCH_SIZE) { + QueryBuilder::new( + "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at)", + ) + .push_values(chunk, |mut b, tracked_radio| { + b.push_bind(&tracked_radio.entity_key) + .push_bind(&tracked_radio.hash) + .push_bind(tracked_radio.last_changed_at) + .push_bind(tracked_radio.last_checked_at); + }) + .push( + r#" + ON CONFLICT (entity_key) DO UPDATE SET + hash = EXCLUDED.hash, + last_changed_at = EXCLUDED.last_changed_at, + last_checked_at = EXCLUDED.last_checked_at + "#, + ) + .build() + .execute(&mut txn) + .await?; + } + + txn.commit().await?; + + Ok(()) +} diff --git a/mobile_config/src/settings.rs b/mobile_config/src/settings.rs index 10a20f479..e90e68a3d 100644 --- a/mobile_config/src/settings.rs +++ b/mobile_config/src/settings.rs @@ -24,9 +24,18 @@ pub struct Settings { /// Settings passed to the db_store crate for connecting to /// the database for Solana on-chain data pub metadata: db_store::Settings, + #[serde( + with = "humantime_serde", + default = "default_mobile_radtio_tracker_interval" + )] + pub mobile_radio_tracker_interval: std::time::Duration, pub metrics: poc_metrics::Settings, } +fn default_mobile_radtio_tracker_interval() -> std::time::Duration { + humantime::parse_duration("1 hour").unwrap() +} + fn default_log() -> String { "mobile_config=debug".to_string() } From 57ef3d0548234a67ba337a317b2c79b5f948d7af Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 25 Nov 2024 15:33:46 -0500 Subject: [PATCH 2/4] Add some logging and handle errors better --- mobile_config/Cargo.toml | 1 - mobile_config/src/mobile_radio_tracker.rs | 28 +++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index 54aea61d1..74479694b 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -47,7 +47,6 @@ triggered = { workspace = true } task-manager = { path = "../task_manager" } solana-sdk = { workspace = true } custom-tracing = { path = "../custom_tracing", features = ["grpc"] } -humantime-serde = { workspace = true } coverage-map = { path = "../coverage_map" } [dev-dependencies] diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 2d7544f4d..109b4e068 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -139,8 +139,9 @@ impl MobileRadioTracker { biased; _ = &mut shutdown => break, _ = interval.tick() => { - //TODO probably shouldn't crash api when this fails - track_changes(&self.pool, &self.metadata).await?; + if let Err(err) = track_changes(&self.pool, &self.metadata).await { + tracing::error!(?err, "error in tracking changes to mobile radios"); + } } } } @@ -150,6 +151,7 @@ impl MobileRadioTracker { } async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { + tracing::info!("looking for changes to radios"); let tracked_radios = get_tracked_radios(pool).await?; let updates: Vec = get_all_mobile_radios(metadata) @@ -164,7 +166,10 @@ async fn track_changes(pool: &Pool, metadata: &Pool) -> anyh .collect() .await; + tracing::info!("updating in db: {}", updates.len()); + update_tracked_radios(pool, updates).await?; + tracing::info!("done"); Ok(()) } @@ -191,19 +196,19 @@ async fn get_tracked_radios( .await } -fn get_all_mobile_radios<'a>(metadata: &'a Pool) -> impl Stream + 'a { +fn get_all_mobile_radios(metadata: &Pool) -> impl Stream + '_ { sqlx::query_as::<_, MobileRadio>( r#" SELECT kta.entity_key, - kta.refreshed_at, + mhi.refreshed_at, mhi.location::bigint, - mhi.is_full_hotspot, + mhi.is_full_hotspot::int, mhi.num_location_asserts, - mhi.is_active, + mhi.is_active::int, mhi.dc_onboarding_fee_paid::bigint, - mhi.device_type, - mhi.deployment_info + mhi.device_type::text, + mhi.deployment_info::text FROM key_to_assets kta INNER JOIN mobile_hotspot_infos mhi ON kta.asset = mhi.asset @@ -212,7 +217,12 @@ fn get_all_mobile_radios<'a>(metadata: &'a Pool) -> impl Stream Date: Tue, 26 Nov 2024 10:07:39 -0500 Subject: [PATCH 3/4] refacotring and adding tests --- mobile_config/src/mobile_radio_tracker.rs | 100 +++++++++++++++++++--- 1 file changed, 87 insertions(+), 13 deletions(-) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 109b4e068..0dc03a8c3 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -7,7 +7,7 @@ use task_manager::ManagedTask; type EntityKey = Vec; -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, Clone, sqlx::FromRow)] struct MobileRadio { entity_key: EntityKey, refreshed_at: DateTime, @@ -131,7 +131,7 @@ impl MobileRadioTracker { } async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { - tracing::info!("starting MobileRadioTracker"); + tracing::info!("starting"); let mut interval = tokio::time::interval(self.interval); loop { @@ -146,6 +146,8 @@ impl MobileRadioTracker { } } + tracing::info!("stopping"); + Ok(()) } } @@ -153,25 +155,32 @@ impl MobileRadioTracker { async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { tracing::info!("looking for changes to radios"); let tracked_radios = get_tracked_radios(pool).await?; + let all_mobile_radios = get_all_mobile_radios(metadata); + + let updates = identify_changes(all_mobile_radios, tracked_radios).await; + tracing::info!("updating in db: {}", updates.len()); + + update_tracked_radios(pool, updates).await?; + tracing::info!("done"); + + Ok(()) +} - let updates: Vec = get_all_mobile_radios(metadata) +async fn identify_changes( + all_mobile_radios: impl Stream, + tracked_radios: HashMap, +) -> Vec { + all_mobile_radios .scan(tracked_radios, |tracked, radio| { let tracked_radio_opt = tracked.remove(&radio.entity_key); - async move { Some((radio, tracked_radio_opt)) } + async { Some((radio, tracked_radio_opt)) } }) .map(|(radio, tracked_radio_opt)| match tracked_radio_opt { Some(tracked_radio) => tracked_radio.update_from_radio(&radio), None => TrackedMobileRadio::new(&radio), }) .collect() - .await; - - tracing::info!("updating in db: {}", updates.len()); - - update_tracked_radios(pool, updates).await?; - tracing::info!("done"); - - Ok(()) + .await } async fn get_tracked_radios( @@ -219,7 +228,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream MobileRadio { + MobileRadio { + entity_key, + refreshed_at: Utc::now() - chrono::Duration::hours(1), + location: Some(1), + is_full_hotspot: Some(1), + num_location_asserts: Some(1), + is_active: Some(1), + dc_onboarding_fee_paid: Some(10), + device_type: "wifi".to_string(), + deployment_info: Some("deployment_info".to_string()), + } + } +} From e16a46bfbc691e387be7b23a8712e541a1525eb2 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 27 Nov 2024 07:37:39 -0500 Subject: [PATCH 4/4] fix typo --- mobile_config/src/settings.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mobile_config/src/settings.rs b/mobile_config/src/settings.rs index e90e68a3d..0e615a348 100644 --- a/mobile_config/src/settings.rs +++ b/mobile_config/src/settings.rs @@ -26,13 +26,13 @@ pub struct Settings { pub metadata: db_store::Settings, #[serde( with = "humantime_serde", - default = "default_mobile_radtio_tracker_interval" + default = "default_mobile_radio_tracker_interval" )] pub mobile_radio_tracker_interval: std::time::Duration, pub metrics: poc_metrics::Settings, } -fn default_mobile_radtio_tracker_interval() -> std::time::Duration { +fn default_mobile_radio_tracker_interval() -> std::time::Duration { humantime::parse_duration("1 hour").unwrap() }