diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 2886da2c2..62b275243 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -17,7 +17,7 @@ use mobile_config::{ gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache, - mobile_radio_tracker::{migrate_mobile_tracker_locations, MobileRadioTracker}, + mobile_radio_tracker::{post_migrate_mobile_tracker_locations, MobileRadioTracker}, settings::Settings, sub_dao_service::SubDaoService, }; @@ -45,13 +45,12 @@ impl Cli { match self.cmd { Cmd::Server(daemon) => daemon.run(&settings).await, - Cmd::MigrateMobileTracker(csv_file) => { + Cmd::PostMigrateMobileTracker => { custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; let mobile_config_pool = settings.database.connect("mobile-config-store").await?; let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?; sqlx::migrate!().run(&mobile_config_pool).await?; - migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool, &csv_file.path) - .await?; + post_migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool).await?; Ok(()) } } @@ -61,8 +60,8 @@ impl Cli { #[derive(Debug, clap::Subcommand)] pub enum Cmd { Server(Daemon), - // Oneshot command to migrate location data for mobile tracker - MigrateMobileTracker(CsvFile), + // Fill missed "asserted_location_changed_at" in mobile_radio_tracker table + PostMigrateMobileTracker, } #[derive(Debug, clap::Args)] diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 3e7fa0980..042040f4c 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, time::Duration}; use chrono::{DateTime, Utc}; -use csv::Reader; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::{Pool, Postgres, QueryBuilder}; @@ -86,13 +85,19 @@ pub struct TrackedMobileRadio { impl TrackedMobileRadio { fn new(radio: &MobileRadio) -> Self { + let asserted_location_changed_at = if radio.location.is_some() { + Some(radio.refreshed_at) + } else { + None + }; + Self { entity_key: radio.entity_key.clone(), hash: radio.hash(), last_changed_at: radio.refreshed_at, last_checked_at: Utc::now(), asserted_location: radio.location, - asserted_location_changed_at: None, + asserted_location_changed_at, } } @@ -292,126 +297,56 @@ async fn update_tracked_radios( Ok(()) } -// This function can be removed after migration is done. -// Expected CSV example: -// "public_key","time","h3" -// "1trSus...srQqM1P",2024-09-20 15:12:55.000 +0000,"8c2a10705a4cbff" -// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos -// 2. Read data from csv report. Fill mobile_radio_tracker.asserted_location_changed_at if location from csv and in mobile_hotspot_infos table matches -// 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0) -pub async fn migrate_mobile_tracker_locations( +// 1. Fill missed `asserted_location_changed_at` in mobile_radio_tracker table +pub async fn post_migrate_mobile_tracker_locations( mobile_config_pool: Pool, metadata_pool: Pool, - csv_file_path: &str, ) -> anyhow::Result<()> { - // 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos + let mut txn = mobile_config_pool.begin().await?; - // get_all_mobile_radios - tracing::info!("Exporting data from mobile_hotspot_infos"); let mobile_infos = get_all_mobile_radios(&metadata_pool) .filter(|v| futures::future::ready(v.location.is_some())) - .filter(|v| futures::future::ready(v.num_location_asserts.is_some_and(|num| num > 0))) + .filter(|v| futures::future::ready(v.num_location_asserts.is_some_and(|num| num >= 1))) .collect::>() .await; - let mut txn = mobile_config_pool.begin().await?; - - const BATCH_SIZE: usize = (u16::MAX / 3) as usize; - - // Set asserted_location in mobile_radio_tracker from metadata_pool - for chunk in mobile_infos.chunks(BATCH_SIZE) { - let mut query_builder = QueryBuilder::new( - "UPDATE mobile_radio_tracker AS mrt SET asserted_location = data.location - FROM ( ", - ); - - query_builder.push_values(chunk, |mut builder, mob_info| { - builder - .push_bind(mob_info.location) - .push_bind(&mob_info.entity_key); - }); - - query_builder.push( - ") AS data(location, entity_key) - WHERE mrt.entity_key = data.entity_key", - ); - - let built = query_builder.build(); - built.execute(&mut *txn).await?; - } - - // 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches - let mobile_infos_map: HashMap<_, _> = mobile_infos - .iter() - .map(|v| (bs58::encode(v.entity_key.clone()).into_string(), v.location)) - .collect(); - tracing::info!("Exporting data from CSV"); - let mut rdr = Reader::from_path(csv_file_path)?; - - #[derive(Debug, serde::Deserialize)] - struct Record { - public_key: PublicKeyBinary, - h3: String, - time: DateTime, - } - - let mut mobile_infos_to_update_map: HashMap> = HashMap::new(); - - let mut csv_migrated_counter = 0; - for record in rdr.deserialize() { - let record: Record = record?; - let pub_key: &str = &record.public_key.to_string(); - - if let Some(v) = mobile_infos_map.get(pub_key) { - let loc = i64::from_str_radix(&record.h3, 16).unwrap(); - if v.unwrap() == loc { - let date_time = record.time; - let entity_key = bs58::decode(pub_key).into_vec()?; - - mobile_infos_to_update_map.insert(entity_key, date_time); - csv_migrated_counter += 1; - } - } else { - tracing::warn!( - "Pubkey: {} exist in csv but not found in metadata database", - pub_key + // Get inconsistent mobile_radio_tracker rows + let mut tracked_radios = get_tracked_radios(&mobile_config_pool).await?; + tracked_radios + .retain(|_k, v| v.asserted_location.is_some() && v.asserted_location_changed_at.is_none()); + tracing::info!( + "Count of tracked radios with inconsistent info before migration: {}", + tracked_radios.len() + ); + + for (k, _v) in tracked_radios { + if let Some(mobile_info) = mobile_infos.iter().find(|v| v.entity_key == k) { + sqlx::query( + r#" + UPDATE +"mobile_radio_tracker" SET asserted_location_changed_at = $1 WHERE entity_key = $2 + "#, ) + .bind(mobile_info.created_at) + .bind(&mobile_info.entity_key) + .execute(&mut *txn) + .await + .unwrap(); + } else { + tracing::error!("Radio {} is not found", PublicKeyBinary::from(k)) } } - tracing::info!("Count radios migrated from CSV: {csv_migrated_counter}"); - - // 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0) - for mi in mobile_infos.into_iter() { - mobile_infos_to_update_map - .entry(mi.entity_key) - .or_insert(mi.created_at); - } - - let mobile_infos_to_update: Vec<(EntityKey, DateTime)> = - mobile_infos_to_update_map.into_iter().collect(); - - tracing::info!("Updating asserted_location_changed_at in db"); - for chunk in mobile_infos_to_update.chunks(BATCH_SIZE) { - let mut query_builder = QueryBuilder::new( - "UPDATE mobile_radio_tracker AS mrt SET asserted_location_changed_at = data.asserted_location_changed_at - FROM ( ", - ); - - query_builder.push_values(chunk, |mut builder, mob_info| { - builder.push_bind(&mob_info.0).push_bind(mob_info.1); - }); - - query_builder.push( - ") AS data(entity_key, asserted_location_changed_at) - WHERE mrt.entity_key = data.entity_key", - ); - - let built = query_builder.build(); - built.execute(&mut *txn).await?; - } txn.commit().await?; + let mut tracked_radios = get_tracked_radios(&mobile_config_pool).await?; + tracked_radios + .retain(|_k, v| v.asserted_location.is_some() && v.asserted_location_changed_at.is_none()); + tracing::info!( + "Count of tracked radios with inconsistent info after migration: {}", + tracked_radios.len() + ); + Ok(()) } @@ -466,7 +401,7 @@ mod tests { } #[tokio::test] - async fn last_asserted_location_will_not_updated_if_nothing_changes() { + async fn asserted_location_changed_at_is_none_if_location_none() { // location None let mut radio = mobile_radio(vec![1, 2, 3]); radio.location = None; @@ -478,17 +413,6 @@ mod tests { assert!(result[0].asserted_location_changed_at.is_none()); assert!(result[0].asserted_location.is_none()); - - // location is 1 - let mut radio = mobile_radio(vec![1, 2, 3]); - radio.location = Some(1); - let tracked_radio = TrackedMobileRadio::new(&radio); - let mut tracked_radios = HashMap::new(); - tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); - - let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; - assert!(result[0].asserted_location_changed_at.is_none()); - assert_eq!(result[0].asserted_location, Some(1)); } #[tokio::test] diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 489c68fa5..69217de7b 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -32,7 +32,7 @@ pub async fn add_mobile_tracker_record( pub async fn add_db_record( pool: &PgPool, asset: &str, - location: i64, + location: Option, device_type: &str, key: PublicKeyBinary, created_at: DateTime, @@ -55,18 +55,19 @@ pub async fn add_db_record( pub async fn add_mobile_hotspot_infos( pool: &PgPool, asset: &str, - location: i64, + location: Option, device_type: &str, created_at: DateTime, refreshed_at: Option>, deployment_info: Option<&str>, ) { + let num_locations = if location.is_some() { Some(1) } else { Some(0) }; sqlx::query( r#" INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info") +"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info", "num_location_asserts") VALUES -($1, $2, $3::jsonb, $4, $5, $6::jsonb); +($1, $2, $3::jsonb, $4, $5, $6::jsonb, $7); "#, ) .bind(asset) @@ -75,6 +76,7 @@ pub async fn add_mobile_hotspot_infos( .bind(created_at) .bind(refreshed_at) .bind(deployment_info) + .bind(num_locations) .execute(pool) .await .unwrap(); diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index c34026abe..1db74af0c 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -127,7 +127,7 @@ async fn gateway_stream_info_v1(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now, @@ -138,7 +138,7 @@ async fn gateway_stream_info_v1(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiDataOnly\"", asset2_pubkey.clone().into(), now_plus_10, @@ -181,7 +181,7 @@ async fn gateway_stream_info_v2(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now, @@ -192,7 +192,7 @@ async fn gateway_stream_info_v2(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiDataOnly\"", asset2_pubkey.clone().into(), now_plus_10, @@ -235,7 +235,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -249,7 +249,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiDataOnly\"", asset2_pubkey.clone().into(), created_at, @@ -295,7 +295,7 @@ async fn gateway_info_batch_v2(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -307,7 +307,7 @@ async fn gateway_info_batch_v2(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiDataOnly\"", asset2_pubkey.clone().into(), created_at, @@ -376,7 +376,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -388,7 +388,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiIndoor\"", asset2_pubkey.clone().into(), created_at, @@ -400,7 +400,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset3", - asset3_hex_idx, + Some(asset3_hex_idx), "\"wifiDataOnly\"", asset3_pubkey.clone().into(), created_at, @@ -414,7 +414,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset4", - asset4_hex_idx, + Some(asset4_hex_idx), "\"wifiIndoor\"", asset4_pubkey.clone().into(), created_at, @@ -484,7 +484,7 @@ async fn gateway_info_v2_no_mobile_tracker_record(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -496,7 +496,7 @@ async fn gateway_info_v2_no_mobile_tracker_record(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiIndoor\"", asset2_pubkey.clone().into(), created_at, @@ -538,7 +538,7 @@ async fn gateway_info_v2(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -608,7 +608,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), created_at, @@ -620,7 +620,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiIndoor\"", asset2_pubkey.clone().into(), created_at, @@ -632,7 +632,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { add_db_record( &pool, "asset3", - asset3_hex_idx, + Some(asset3_hex_idx), "\"wifiDataOnly\"", asset3_pubkey.clone().into(), created_at, @@ -696,7 +696,7 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now, @@ -707,7 +707,7 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { add_db_record( &pool, "asset2", - asset2_hex_idx, + Some(asset2_hex_idx), "\"wifiDataOnly\"", asset2_pubkey.clone().into(), now, @@ -719,7 +719,7 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { add_db_record( &pool, "asset3", - asset3_hex_idx, + Some(asset3_hex_idx), "\"wifiDataOnly\"", asset3_pubkey.clone().into(), now, diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs index d0a4783d4..3f0f20cd0 100644 --- a/mobile_config/tests/mobile_radio_tracker.rs +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -7,7 +7,7 @@ pub mod common; use common::*; #[sqlx::test] -async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { +async fn mt_handle_entity_duplicates(pool: PgPool) { // In case of duplications mobile tracker must use newer (refreshed_at) let asset1_pubkey = make_keypair().public_key().clone(); let asset1_hex_idx = 631711281837647359_i64; @@ -19,7 +19,7 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now_minus_hour, @@ -31,7 +31,7 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now, @@ -43,7 +43,7 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { add_db_record( &pool, "asset1", - asset1_hex_idx, + Some(asset1_hex_idx), "\"wifiIndoor\"", asset1_pubkey.clone().into(), now, @@ -62,3 +62,127 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { now.timestamp_millis() ); } + +#[sqlx::test] +async fn mt_update_radio_location_none(pool: PgPool) { + // 1. Add a new radio without location + // 2. Update radio, set location + let asset2_pubkey = make_keypair().public_key().clone(); + create_db_tables(&pool).await; + let now = Utc::now(); + let now_minus_hour = now - chrono::Duration::hours(1); + let pubkey2_binary = PublicKeyBinary::from(asset2_pubkey.clone()); + + add_db_record( + &pool, + "asset2", + None, + "\"wifiIndoor\"", + asset2_pubkey.clone().into(), + now_minus_hour, + Some(now_minus_hour), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 160, "elevation": 5, "electricalDownTilt": 1, "mechanicalDownTilt": 2}}"#) + ) + .await; + + track_changes(&pool, &pool).await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); + + // Check radio with location none + let b58 = bs58::decode(pubkey2_binary.to_string()).into_vec().unwrap(); + let tracked_radio = tracked_radios.get::>(&b58).unwrap(); + assert!(tracked_radio.asserted_location.is_none()); + assert!(tracked_radio.asserted_location_changed_at.is_none()); + + // Update radio, set new location + sqlx::query( + r#" + UPDATE +"mobile_hotspot_infos" SET location = $1, refreshed_at = $2, num_location_asserts = 1 WHERE asset = 'asset2' + "#, + ) + .bind(12) + .bind(now) + .execute(&pool) + .await + .unwrap(); + track_changes(&pool, &pool).await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); + + let b58 = bs58::decode(pubkey2_binary.to_string()).into_vec().unwrap(); + let tracked_radio = tracked_radios.get::>(&b58).unwrap(); + assert_eq!(tracked_radio.asserted_location, Some(12)); + assert_eq!( + tracked_radio + .asserted_location_changed_at + .unwrap() + .timestamp_millis(), + now.timestamp_millis() + ); +} + +#[sqlx::test] +async fn mt_update_radio_location_exist(pool: PgPool) { + // 1. Add a new radio with location + // 2. Update radio, set a new location + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + create_db_tables(&pool).await; + let now = Utc::now(); + let now_minus_hour = now - chrono::Duration::hours(1); + let pubkey_binary = PublicKeyBinary::from(asset1_pubkey.clone()); + + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_hour, + Some(now_minus_hour), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 160, "elevation": 5, "electricalDownTilt": 1, "mechanicalDownTilt": 2}}"#) + ) + .await; + + let b58 = bs58::decode(pubkey_binary.to_string()).into_vec().unwrap(); + track_changes(&pool, &pool).await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); + assert_eq!(tracked_radios.len(), 1); + // Check radio with location is not None + let tracked_radio = tracked_radios.get::>(&b58).unwrap(); + assert_eq!( + tracked_radio.last_changed_at.timestamp_millis(), + now_minus_hour.timestamp_millis() + ); + assert_eq!( + tracked_radio + .asserted_location_changed_at + .unwrap() + .timestamp_millis(), + now_minus_hour.timestamp_millis() + ); + + // Update radio with location none + sqlx::query( + r#" + UPDATE +"mobile_hotspot_infos" SET location = $1, refreshed_at = $2, num_location_asserts = 2 WHERE asset = 'asset1' + "#, + ) + .bind(12) + .bind(now) + .execute(&pool) + .await + .unwrap(); + track_changes(&pool, &pool).await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); + let tracked_radio = tracked_radios.get::>(&b58).unwrap(); + assert_eq!(tracked_radio.asserted_location, Some(12)); + assert_eq!( + tracked_radio + .asserted_location_changed_at + .unwrap() + .timestamp_millis(), + now.timestamp_millis() + ); +}