Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use mobile_config::{
gateway_service::GatewayService,
hex_boosting_service::HexBoostingService,
key_cache::KeyCache,
mobile_radio_tracker::{migrate_mobile_tracker_locations, MobileRadioTracker},
mobile_radio_tracker::{post_migrate_mobile_tracker_locations, MobileRadioTracker},
settings::Settings,
sub_dao_service::SubDaoService,
};
Expand Down Expand Up @@ -45,13 +45,12 @@ impl Cli {

match self.cmd {
Cmd::Server(daemon) => daemon.run(&settings).await,
Cmd::MigrateMobileTracker(csv_file) => {
Cmd::PostMigrateMobileTracker => {
custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?;
let mobile_config_pool = settings.database.connect("mobile-config-store").await?;
let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?;
sqlx::migrate!().run(&mobile_config_pool).await?;
migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool, &csv_file.path)
.await?;
post_migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool).await?;
Ok(())
}
}
Expand All @@ -61,8 +60,8 @@ impl Cli {
#[derive(Debug, clap::Subcommand)]
pub enum Cmd {
Server(Daemon),
// Oneshot command to migrate location data for mobile tracker
MigrateMobileTracker(CsvFile),
// Fill missed "asserted_location_changed_at" in mobile_radio_tracker table
PostMigrateMobileTracker,
}

#[derive(Debug, clap::Args)]
Expand Down
162 changes: 43 additions & 119 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::HashMap, time::Duration};

use chrono::{DateTime, Utc};
use csv::Reader;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::{Pool, Postgres, QueryBuilder};
Expand Down Expand Up @@ -86,13 +85,19 @@ pub struct TrackedMobileRadio {

impl TrackedMobileRadio {
fn new(radio: &MobileRadio) -> Self {
let asserted_location_changed_at = if radio.location.is_some() {
Some(radio.refreshed_at)
} else {
None
};

Self {
entity_key: radio.entity_key.clone(),
hash: radio.hash(),
last_changed_at: radio.refreshed_at,
last_checked_at: Utc::now(),
asserted_location: radio.location,
asserted_location_changed_at: None,
asserted_location_changed_at,
}
}

Expand Down Expand Up @@ -292,126 +297,56 @@ async fn update_tracked_radios(
Ok(())
}

// This function can be removed after migration is done.
// Expected CSV example:
// "public_key","time","h3"
// "1trSus...srQqM1P",2024-09-20 15:12:55.000 +0000,"8c2a10705a4cbff"
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
// 2. Read data from csv report. Fill mobile_radio_tracker.asserted_location_changed_at if location from csv and in mobile_hotspot_infos table matches
// 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0)
pub async fn migrate_mobile_tracker_locations(
// 1. Fill missed `asserted_location_changed_at` in mobile_radio_tracker table
pub async fn post_migrate_mobile_tracker_locations(
mobile_config_pool: Pool<Postgres>,
metadata_pool: Pool<Postgres>,
csv_file_path: &str,
) -> anyhow::Result<()> {
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
let mut txn = mobile_config_pool.begin().await?;

// get_all_mobile_radios
tracing::info!("Exporting data from mobile_hotspot_infos");
let mobile_infos = get_all_mobile_radios(&metadata_pool)
.filter(|v| futures::future::ready(v.location.is_some()))
.filter(|v| futures::future::ready(v.num_location_asserts.is_some_and(|num| num > 0)))
.filter(|v| futures::future::ready(v.num_location_asserts.is_some_and(|num| num >= 1)))
.collect::<Vec<_>>()
.await;

let mut txn = mobile_config_pool.begin().await?;

const BATCH_SIZE: usize = (u16::MAX / 3) as usize;

// Set asserted_location in mobile_radio_tracker from metadata_pool
for chunk in mobile_infos.chunks(BATCH_SIZE) {
let mut query_builder = QueryBuilder::new(
"UPDATE mobile_radio_tracker AS mrt SET asserted_location = data.location
FROM ( ",
);

query_builder.push_values(chunk, |mut builder, mob_info| {
builder
.push_bind(mob_info.location)
.push_bind(&mob_info.entity_key);
});

query_builder.push(
") AS data(location, entity_key)
WHERE mrt.entity_key = data.entity_key",
);

let built = query_builder.build();
built.execute(&mut *txn).await?;
}

// 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches
let mobile_infos_map: HashMap<_, _> = mobile_infos
.iter()
.map(|v| (bs58::encode(v.entity_key.clone()).into_string(), v.location))
.collect();
tracing::info!("Exporting data from CSV");
let mut rdr = Reader::from_path(csv_file_path)?;

#[derive(Debug, serde::Deserialize)]
struct Record {
public_key: PublicKeyBinary,
h3: String,
time: DateTime<Utc>,
}

let mut mobile_infos_to_update_map: HashMap<EntityKey, DateTime<Utc>> = HashMap::new();

let mut csv_migrated_counter = 0;
for record in rdr.deserialize() {
let record: Record = record?;
let pub_key: &str = &record.public_key.to_string();

if let Some(v) = mobile_infos_map.get(pub_key) {
let loc = i64::from_str_radix(&record.h3, 16).unwrap();
if v.unwrap() == loc {
let date_time = record.time;
let entity_key = bs58::decode(pub_key).into_vec()?;

mobile_infos_to_update_map.insert(entity_key, date_time);
csv_migrated_counter += 1;
}
} else {
tracing::warn!(
"Pubkey: {} exist in csv but not found in metadata database",
pub_key
// Get inconsistent mobile_radio_tracker rows
let mut tracked_radios = get_tracked_radios(&mobile_config_pool).await?;
tracked_radios
.retain(|_k, v| v.asserted_location.is_some() && v.asserted_location_changed_at.is_none());
tracing::info!(
"Count of tracked radios with inconsistent info before migration: {}",
tracked_radios.len()
);

for (k, _v) in tracked_radios {
if let Some(mobile_info) = mobile_infos.iter().find(|v| v.entity_key == k) {
Copy link

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid O(n²) lookups, build a HashMap from mobile_infos keyed by entity_key before this loop so you can do O(1) lookups instead of .iter().find() on each iteration.

Suggested change
if let Some(mobile_info) = mobile_infos.iter().find(|v| v.entity_key == k) {
if let Some(mobile_info) = mobile_infos_map.get(&k) {

Copilot uses AI. Check for mistakes.
Copy link
Member Author

@kurotych kurotych Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small number of elements (<200) is expected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is swap around the iteration.

for mobile_radio in mobile_radios {
    if tracked_radios.contains_key(&mobile_radio.entity_key) {
      // ... Do the work
    }
}

sqlx::query(
r#"
UPDATE
"mobile_radio_tracker" SET asserted_location_changed_at = $1 WHERE entity_key = $2
"#,
)
.bind(mobile_info.created_at)
.bind(&mobile_info.entity_key)
.execute(&mut *txn)
.await
.unwrap();
} else {
tracing::error!("Radio {} is not found", PublicKeyBinary::from(k))
}
}
tracing::info!("Count radios migrated from CSV: {csv_migrated_counter}");

// 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0)
for mi in mobile_infos.into_iter() {
mobile_infos_to_update_map
.entry(mi.entity_key)
.or_insert(mi.created_at);
}

let mobile_infos_to_update: Vec<(EntityKey, DateTime<Utc>)> =
mobile_infos_to_update_map.into_iter().collect();

tracing::info!("Updating asserted_location_changed_at in db");
for chunk in mobile_infos_to_update.chunks(BATCH_SIZE) {
let mut query_builder = QueryBuilder::new(
"UPDATE mobile_radio_tracker AS mrt SET asserted_location_changed_at = data.asserted_location_changed_at
FROM ( ",
);

query_builder.push_values(chunk, |mut builder, mob_info| {
builder.push_bind(&mob_info.0).push_bind(mob_info.1);
});

query_builder.push(
") AS data(entity_key, asserted_location_changed_at)
WHERE mrt.entity_key = data.entity_key",
);

let built = query_builder.build();
built.execute(&mut *txn).await?;
}

txn.commit().await?;

let mut tracked_radios = get_tracked_radios(&mobile_config_pool).await?;
tracked_radios
.retain(|_k, v| v.asserted_location.is_some() && v.asserted_location_changed_at.is_none());
tracing::info!(
"Count of tracked radios with inconsistent info after migration: {}",
tracked_radios.len()
);

Ok(())
}

Expand Down Expand Up @@ -466,7 +401,7 @@ mod tests {
}

#[tokio::test]
async fn last_asserted_location_will_not_updated_if_nothing_changes() {
async fn asserted_location_changed_at_is_none_if_location_none() {
// location None
let mut radio = mobile_radio(vec![1, 2, 3]);
radio.location = None;
Expand All @@ -478,17 +413,6 @@ mod tests {

assert!(result[0].asserted_location_changed_at.is_none());
assert!(result[0].asserted_location.is_none());

// location is 1
let mut radio = mobile_radio(vec![1, 2, 3]);
radio.location = Some(1);
let tracked_radio = TrackedMobileRadio::new(&radio);
let mut tracked_radios = HashMap::new();
tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio);

let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await;
assert!(result[0].asserted_location_changed_at.is_none());
assert_eq!(result[0].asserted_location, Some(1));
}

#[tokio::test]
Expand Down
10 changes: 6 additions & 4 deletions mobile_config/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn add_mobile_tracker_record(
pub async fn add_db_record(
pool: &PgPool,
asset: &str,
location: i64,
location: Option<i64>,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
Expand All @@ -55,18 +55,19 @@ pub async fn add_db_record(
pub async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
location: Option<i64>,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
let num_locations = if location.is_some() { Some(1) } else { Some(0) };
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info")
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info", "num_location_asserts")
VALUES
($1, $2, $3::jsonb, $4, $5, $6::jsonb);
($1, $2, $3::jsonb, $4, $5, $6::jsonb, $7);
"#,
)
.bind(asset)
Expand All @@ -75,6 +76,7 @@ pub async fn add_mobile_hotspot_infos(
.bind(created_at)
.bind(refreshed_at)
.bind(deployment_info)
.bind(num_locations)
.execute(pool)
.await
.unwrap();
Expand Down
Loading