From ac1d1f720375f0f6f3c8cf71a6aeceda81b7d1dd Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 10 Jul 2025 14:29:28 +0300 Subject: [PATCH 01/18] Make stream_multi_gateways_info func more general. Add gateway_info_v3 skeleton --- Cargo.lock | 8 +-- Cargo.toml | 4 +- file_store/src/traits/msg_verify.rs | 2 + mobile_config/src/gateway_info_v3.rs | 55 ++++++++++++++++++ mobile_config/src/gateway_service.rs | 86 ++++++++++++++++++++++++++-- mobile_config/src/lib.rs | 1 + 6 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 mobile_config/src/gateway_info_v3.rs diff --git a/Cargo.lock b/Cargo.lock index 1bb1d3bee..bb0d95e87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#2ebbdb1772ffed9fcde2b6a7de5aa7b2afeca1a6" +source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#7a564041f35e3d1b55f59754c50063d0f111744e" dependencies = [ "bytes", "prost", @@ -5859,7 +5859,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -5879,7 +5879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.101", @@ -11221,7 +11221,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 79cc88793..691ff1c4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,5 +131,5 @@ anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madnin # helium-proto = { path = "../proto" } # beacon = { path = "../proto/beacon" } -# [patch.'https://github.com/helium/proto'] -# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "bbalser/deprecate-radio-reward-v1" } +[patch.'https://github.com/helium/proto'] +helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "mobile-config-loc-assert" } diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 64990365f..42e04b0a2 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -92,11 +92,13 @@ impl_msg_verify!(mobile_config::CarrierKeyToEntityResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamReqV2, signature); +impl_msg_verify!(mobile_config::GatewayInfoStreamReqV3, signature); impl_msg_verify!(mobile_config::GatewayInfoResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoResV2, signature); impl_msg_verify!(mobile_config::GatewayInfoBatchReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamResV2, signature); +impl_msg_verify!(mobile_config::GatewayInfoStreamResV3, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs new file mode 100644 index 000000000..4f7548c3b --- /dev/null +++ b/mobile_config/src/gateway_info_v3.rs @@ -0,0 +1,55 @@ +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::mobile_config::{ + // gateway_metadata_v2::DeploymentInfo as DeploymentInfoProto, + // CbrsDeploymentInfo as CbrsDeploymentInfoProto, + // CbrsRadioDeploymentInfo as CbrsRadioDeploymentInfoProto, DeviceType as DeviceTypeProto, + // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, + // GatewayMetadata as GatewayMetadataProto, GatewayMetadataV2 as GatewayMetadataProtoV2, + // WifiDeploymentInfo as WifiDeploymentInfoProto, + GatewayInfoV3 as GatewayInfoProtoV3, +}; +use sqlx::PgExecutor; + +use futures::stream::Stream; + +#[derive(Clone, Debug)] +pub struct GatewayInfoV3 { + pub address: PublicKeyBinary, + // pub metadata: Option, + // pub device_type: DeviceType, + // Optional fields are None for GatewayInfoProto (V1) + pub created_at: DateTime, + // updated_at refers to the last time the data was actually changed. + pub updated_at: DateTime, + // refreshed_at indicates the last time the chain was consulted, regardless of data changes. + pub refreshed_at: DateTime, +} + +impl TryFrom for GatewayInfoProtoV3 { + type Error = hextree::Error; + + fn try_from(_info: GatewayInfoV3) -> Result { + todo!() + // let metadata = if let Some(metadata) = info.metadata { + // Some(GatewayMetadataProto { + // location: hextree::Cell::from_raw(metadata.location)?.to_string(), + // }) + // } else { + // None + // }; + // Ok(Self { + // address: info.address.into(), + // metadata, + // device_type: info.device_type as i32, + // }) + } +} + +pub fn all_info_stream_v3<'a>( + _db: impl PgExecutor<'a> + 'a, + // device_types: &'a [DeviceType], +) -> impl Stream + 'a { + // TODO + futures::stream::empty::() +} diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 7dfd5fe9a..d31eee9d9 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -4,6 +4,7 @@ use crate::{ db::{get_batch_tracked_radios, get_updated_radios}, DeviceType, GatewayInfo, }, + gateway_info_v3::{self}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -18,8 +19,8 @@ use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::mobile_config::{ self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, - GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1, - GatewayInfoStreamResV2, GatewayInfoV2, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamReqV3, + GatewayInfoStreamResV1, GatewayInfoStreamResV2, GatewayInfoStreamResV3, GatewayInfoV2, }, Message, }; @@ -349,6 +350,62 @@ impl mobile_config::Gateway for GatewayService { Ok(Response::new(GrpcStreamResult::new(rx))) } + + type info_stream_v3Stream = GrpcStreamResult; + async fn info_stream_v3( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-stream-v3"); + custom_tracing::record_b58("signer", &request.signer); + + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, &request)?; + + let metadata_db_pool = self.metadata_pool.clone(); + let mobile_config_db_pool = self.mobile_config_db_pool.clone(); + let signing_key = self.signing_key.clone(); + let batch_size = request.batch_size; + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + // let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + + tokio::spawn(async move { + let min_updated_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + )) + .unwrap(); // TODO + + let _min_location_changed_at = Utc + .timestamp_opt(request.min_location_changed_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_location_changed_at argument", + )) + .unwrap(); // TODO; + + let _updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + let stream = gateway_info_v3::all_info_stream_v3(&metadata_db_pool); + // let stream = stream + // .filter_map(|gateway_info| { + // // todo set location and location_changed_at here? + // future::ready(handle_updated_at( + // gateway_info, + // &updated_radios, + // min_updated_at, + // )) + // }) + // .boxed(); + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + }); + + Ok(Response::new(GrpcStreamResult::new(rx))) + } } fn handle_updated_at( @@ -426,20 +483,37 @@ impl GatewayInfoStreamRes for GatewayInfoStreamResV2 { } } -async fn stream_multi_gateways_info( - stream: impl Stream, +impl GatewayInfoStreamRes for GatewayInfoStreamResV3 { + type GatewayInfoType = mobile_config::GatewayInfoV3; + + fn new(gateways: Vec, timestamp: u64, signer: Vec) -> Self { + GatewayInfoStreamResV3 { + gateways, + timestamp, + signer, + signature: vec![], + } + } + + fn set_signature(&mut self, signature: Vec) { + self.signature = signature; + } +} + +async fn stream_multi_gateways_info( + stream: impl Stream, tx: tokio::sync::mpsc::Sender>, signing_key: Arc, batch_size: u32, ) -> anyhow::Result<()> where T: GatewayInfoStreamRes + Send + Sync + 'static + helium_proto::Message, - T::GatewayInfoType: TryFrom + Send + 'static, + T::GatewayInfoType: TryFrom + Send + 'static, { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); Ok(stream - .map(Ok::) + .map(Ok::) .try_filter_map(|info| async move { let result: Option = info.try_into().ok(); Ok(result) diff --git a/mobile_config/src/lib.rs b/mobile_config/src/lib.rs index 567c6d968..9495a07ac 100644 --- a/mobile_config/src/lib.rs +++ b/mobile_config/src/lib.rs @@ -13,6 +13,7 @@ pub mod carrier_service; pub mod client; pub mod entity_service; pub mod gateway_info; +pub mod gateway_info_v3; pub mod gateway_service; pub mod hex_boosting_service; pub mod sub_dao_epoch_reward_info; From 79d21bbd06e0b16866a6dbf8706d120fa20ceefd Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 10 Jul 2025 17:54:17 +0300 Subject: [PATCH 02/18] save progress --- mobile_config/src/gateway_info_v3.rs | 104 ++++++++++++++++++++++++--- mobile_config/src/gateway_service.rs | 9 +-- 2 files changed, 98 insertions(+), 15 deletions(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index 4f7548c3b..a9bbb6934 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -7,18 +7,31 @@ use helium_proto::services::mobile_config::{ // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, // GatewayMetadata as GatewayMetadataProto, GatewayMetadataV2 as GatewayMetadataProtoV2, // WifiDeploymentInfo as WifiDeploymentInfoProto, + DeviceTypeV2 as DeviceTypeProtoV2, GatewayInfoV3 as GatewayInfoProtoV3, }; -use sqlx::PgExecutor; -use futures::stream::Stream; +#[derive(Clone, Debug)] +pub struct GatewayMetadataV3 { + pub location: u64, + pub location_changed_at: DateTime, + pub antenna: u32, + pub elevation: u32, + pub azimuth: u32, +} + +#[derive(Clone, Debug)] +pub enum DeviceTypeV2 { + Indoor, + Outdoor, + DataOnly, +} #[derive(Clone, Debug)] pub struct GatewayInfoV3 { pub address: PublicKeyBinary, - // pub metadata: Option, - // pub device_type: DeviceType, - // Optional fields are None for GatewayInfoProto (V1) + pub metadata: Option, + pub device_type: DeviceTypeV2, pub created_at: DateTime, // updated_at refers to the last time the data was actually changed. pub updated_at: DateTime, @@ -26,6 +39,16 @@ pub struct GatewayInfoV3 { pub refreshed_at: DateTime, } +impl From for DeviceTypeV2 { + fn from(value: DeviceTypeProtoV2) -> Self { + match value { + DeviceTypeProtoV2::Indoor => DeviceTypeV2::Indoor, + DeviceTypeProtoV2::Outdoor => DeviceTypeV2::Outdoor, + DeviceTypeProtoV2::DataOnly => DeviceTypeV2::DataOnly, + } + } +} + impl TryFrom for GatewayInfoProtoV3 { type Error = hextree::Error; @@ -46,10 +69,69 @@ impl TryFrom for GatewayInfoProtoV3 { } } -pub fn all_info_stream_v3<'a>( - _db: impl PgExecutor<'a> + 'a, - // device_types: &'a [DeviceType], -) -> impl Stream + 'a { - // TODO - futures::stream::empty::() +pub(crate) mod db { + use chrono::{DateTime, Utc}; + use futures::stream::Stream; + use futures::TryStreamExt; + use helium_crypto::PublicKeyBinary; + use sqlx::PgExecutor; + use sqlx::Row; + use std::{collections::HashMap, str::FromStr, sync::LazyLock}; + + use super::{DeviceTypeV2, GatewayInfoV3}; + + pub struct MobileTrackerInfo { + location: Option, + last_changed_at: DateTime, + asserted_location_changed_at: Option>, + } + pub type MobileTrackerInfoMap = HashMap; + + const GET_UPDATED_RADIOS: &str = + "SELECT entity_key, last_changed_at, asserted_location, asserted_location_changed_at + FROM mobile_radio_tracker + WHERE last_changed_at >= $1 and asserted_location_changed_at >= $2"; + + // TODO think how to handle asserted_location and asserted_location_changed_at NULLs + pub async fn get_mobile_tracker_gateways_info( + db: impl PgExecutor<'_>, + min_updated_at: DateTime, + min_location_changed_at: DateTime, + ) -> anyhow::Result { + sqlx::query(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .bind(min_location_changed_at) + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + MobileTrackerInfoMap::new(), + |mut map: MobileTrackerInfoMap, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let last_changed_at = row.get::, &str>("last_changed_at"); + let asserted_location_changed_at = + row.get::>, &str>("asserted_location_changed_at "); + let location = row.get::, &str>("location"); + + map.insert( + PublicKeyBinary::from_str(&entity_key)?, + MobileTrackerInfo { + location: location.map(|v| v as u64), + last_changed_at, + asserted_location_changed_at, + }, + ); + Ok(map) + }, + ) + .await + } + + pub fn all_info_stream_v3<'a>( + _db: impl PgExecutor<'a> + 'a, + device_types: &'a [DeviceTypeV2], + ) -> impl Stream + 'a { + // TODO + futures::stream::empty::() + } } diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index d31eee9d9..79eade5ec 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -4,7 +4,7 @@ use crate::{ db::{get_batch_tracked_radios, get_updated_radios}, DeviceType, GatewayInfo, }, - gateway_info_v3::{self}, + gateway_info_v3::{self, DeviceTypeV2}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -370,7 +370,7 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); - // let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + let device_types: Vec = request.device_types().map(|v| v.into()).collect(); tokio::spawn(async move { let min_updated_at = Utc @@ -389,8 +389,9 @@ impl mobile_config::Gateway for GatewayService { )) .unwrap(); // TODO; - let _updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; - let stream = gateway_info_v3::all_info_stream_v3(&metadata_db_pool); + let _updated_radios = + get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + let stream = gateway_info_v3::db::all_info_stream_v3(&metadata_db_pool, &device_types); // let stream = stream // .filter_map(|gateway_info| { // // todo set location and location_changed_at here? From 580ade7ee8adfd50722bd4383abab07d378927f0 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 15 Jul 2025 13:07:36 +0300 Subject: [PATCH 03/18] It is compilable --- Cargo.lock | 6 +- mobile_config/src/gateway_info_v3.rs | 261 ++++++++++++++++++++++----- mobile_config/src/gateway_service.rs | 37 ++-- 3 files changed, 246 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb0d95e87..b8c034272 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#7a564041f35e3d1b55f59754c50063d0f111744e" +source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#89c549665af77ac1fabe4ae1f2b54f1acac2ef73" dependencies = [ "bytes", "prost", @@ -5859,7 +5859,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -5879,7 +5879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.101", diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index a9bbb6934..c6c3cd8be 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -7,17 +7,24 @@ use helium_proto::services::mobile_config::{ // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, // GatewayMetadata as GatewayMetadataProto, GatewayMetadataV2 as GatewayMetadataProtoV2, // WifiDeploymentInfo as WifiDeploymentInfoProto, + DeploymentInfo as DeploymentInfoProto, DeviceTypeV2 as DeviceTypeProtoV2, GatewayInfoV3 as GatewayInfoProtoV3, + LocationInfo as LocationInfoProto, }; +use crate::gateway_info::DeviceTypeParseError; + #[derive(Clone, Debug)] -pub struct GatewayMetadataV3 { +pub struct LocationInfo { pub location: u64, pub location_changed_at: DateTime, - pub antenna: u32, - pub elevation: u32, - pub azimuth: u32, +} + +#[derive(Clone, Debug)] +pub struct GatewayMetadataV3 { + pub location_info: LocationInfo, + pub deployment_info: Option, } #[derive(Clone, Debug)] @@ -27,6 +34,30 @@ pub enum DeviceTypeV2 { DataOnly, } +impl std::str::FromStr for DeviceTypeV2 { + type Err = DeviceTypeParseError; + + fn from_str(s: &str) -> Result { + let result = match s { + "wifiIndoor" => Self::Indoor, + "wifiOutdoor" => Self::Outdoor, + "wifiDataOnly" => Self::DataOnly, + _ => return Err(DeviceTypeParseError), + }; + Ok(result) + } +} + +impl DeviceTypeV2 { + fn to_sql_param(&self) -> &'static str { + match self { + DeviceTypeV2::Indoor => "wifiIndoor", + DeviceTypeV2::Outdoor => "wifiOutdoor", + DeviceTypeV2::DataOnly => "wifiDataOnly", + } + } +} + #[derive(Clone, Debug)] pub struct GatewayInfoV3 { pub address: PublicKeyBinary, @@ -71,14 +102,17 @@ impl TryFrom for GatewayInfoProtoV3 { pub(crate) mod db { use chrono::{DateTime, Utc}; - use futures::stream::Stream; - use futures::TryStreamExt; + use futures::{ + stream::{Stream, StreamExt}, + TryStreamExt, + }; use helium_crypto::PublicKeyBinary; - use sqlx::PgExecutor; + use helium_proto::services::mobile_config::DeploymentInfo as DeploymentInfoProto; use sqlx::Row; + use sqlx::{types::Json, PgExecutor}; use std::{collections::HashMap, str::FromStr, sync::LazyLock}; - use super::{DeviceTypeV2, GatewayInfoV3}; + use super::{DeviceTypeV2, GatewayInfoV3, GatewayMetadataV3}; pub struct MobileTrackerInfo { location: Option, @@ -87,51 +121,194 @@ pub(crate) mod db { } pub type MobileTrackerInfoMap = HashMap; + // TODO test and add indexes if needed const GET_UPDATED_RADIOS: &str = "SELECT entity_key, last_changed_at, asserted_location, asserted_location_changed_at - FROM mobile_radio_tracker - WHERE last_changed_at >= $1 and asserted_location_changed_at >= $2"; + FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + + static GET_UPDATED_RADIOS_WITH_LOCATION: LazyLock = LazyLock::new(|| { + format!("{GET_UPDATED_RADIOS} AND asserted_location IS NOT NULL AND asserted_location_changed_at >= $2") + }); + + const GET_MOBILE_HOTSPOT_INFO: &str = r#" + SELECT kta.entity_key, infos.device_type, infos.refreshed_at, infos.created_at, infos.deployment_info + FROM mobile_hotspot_infos infos + JOIN key_to_assets kta ON infos.asset = kta.asset + WHERE device_type != '"cbrs"' + "#; + const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + static DEVICE_TYPES_METADATA_SQL: LazyLock = + LazyLock::new(|| format!("{GET_MOBILE_HOTSPOT_INFO} {DEVICE_TYPES_WHERE_SNIPPET}")); - // TODO think how to handle asserted_location and asserted_location_changed_at NULLs pub async fn get_mobile_tracker_gateways_info( db: impl PgExecutor<'_>, min_updated_at: DateTime, - min_location_changed_at: DateTime, + min_location_changed_at: Option>, ) -> anyhow::Result { - sqlx::query(GET_UPDATED_RADIOS) - .bind(min_updated_at) - .bind(min_location_changed_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_fold( - MobileTrackerInfoMap::new(), - |mut map: MobileTrackerInfoMap, row| async move { - let entity_key_b = row.get::<&[u8], &str>("entity_key"); - let entity_key = bs58::encode(entity_key_b).into_string(); - let last_changed_at = row.get::, &str>("last_changed_at"); - let asserted_location_changed_at = - row.get::>, &str>("asserted_location_changed_at "); - let location = row.get::, &str>("location"); - - map.insert( - PublicKeyBinary::from_str(&entity_key)?, - MobileTrackerInfo { - location: location.map(|v| v as u64), - last_changed_at, - asserted_location_changed_at, - }, - ); - Ok(map) - }, - ) - .await + // TODO refactor + if let Some(min_loc_changed_at) = min_location_changed_at { + sqlx::query(&GET_UPDATED_RADIOS_WITH_LOCATION) + .bind(min_updated_at) + .bind(min_loc_changed_at) + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + MobileTrackerInfoMap::new(), + |mut map: MobileTrackerInfoMap, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let last_changed_at = row.get::, &str>("last_changed_at"); + let asserted_location_changed_at = + row.get::>, &str>("asserted_location_changed_at "); + let asserted_location = row.get::("asserted_location"); + + map.insert( + PublicKeyBinary::from_str(&entity_key)?, + MobileTrackerInfo { + location: Some(asserted_location as u64), + last_changed_at, + asserted_location_changed_at, + }, + ); + Ok(map) + }, + ) + .await + } else { + sqlx::query(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + MobileTrackerInfoMap::new(), + |mut map: MobileTrackerInfoMap, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let last_changed_at = row.get::, &str>("last_changed_at"); + let asserted_location_changed_at = + row.get::>, &str>("asserted_location_changed_at "); + let asserted_location = row.get::, &str>("asserted_location"); + + map.insert( + PublicKeyBinary::from_str(&entity_key)?, + MobileTrackerInfo { + location: asserted_location.map(|v| v as u64), + last_changed_at, + asserted_location_changed_at, + }, + ); + Ok(map) + }, + ) + .await + } } pub fn all_info_stream_v3<'a>( - _db: impl PgExecutor<'a> + 'a, + db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceTypeV2], + mtim: &'a MobileTrackerInfoMap, ) -> impl Stream + 'a { - // TODO - futures::stream::empty::() + match device_types.is_empty() { + true => sqlx::query(GET_MOBILE_HOTSPOT_INFO) + .fetch(db) + .filter_map(move |hs_info| async move { + match hs_info { + Ok(info_row) => { + let address = PublicKeyBinary::from_str( + &bs58::encode(info_row.get::<&[u8], &str>("entity_key")) + .into_string(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err))) + .unwrap(); // TODO remove unwrap() + + match mtim.get(&address) { + Some(mti) => { + // TODO test location is Some but asserted_location_changed_at + // is None + let location = mti.location; + let metadata = if let Some(loc) = location { + // If location is Some, asserted_location_changed_at must + // also be some. Otherwise, data is corrupted + let asserted_location_changed_at = + mti.asserted_location_changed_at?; + + // TODO function getWifiDeploymentInfo + let deployment_info = match info_row.try_get::, + >, &str>( + "deployment_info" + ) { + Ok(di) => di.map(|v| v.0), + // We shouldn't fail if an error occurs in this case. + // This is because the data in this column could be inconsistent, + // and we don't want to break + Err(_e) => None, + }; + let deployment_info = match deployment_info { + Some(di) => match di { + crate::gateway_info::DeploymentInfo::WifiDeploymentInfo(wdi) => { + Some(DeploymentInfoProto { + antenna: wdi.antenna, + elevation: wdi.elevation, + azimuth: wdi.azimuth, + }) + } + crate::gateway_info::DeploymentInfo::CbrsDeploymentInfo(_cdi) => None, + }, + None => None, + }; + + Some(GatewayMetadataV3 { + location_info: super::LocationInfo { + location: loc, + location_changed_at: asserted_location_changed_at, + }, + deployment_info, + }) + } else { + None + }; + let device_type = DeviceTypeV2::from_str( + info_row.get::, &str>("device_type") + .to_string() + .as_ref(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err))).unwrap();// todo remove unwrap + let created_at = info_row.get::, &str>("created_at"); + let refreshed_at = info_row.get::, &str>("refreshed_at"); + + Some(GatewayInfoV3 { + address, + metadata, + device_type, + created_at, + refreshed_at, + updated_at: mti.last_changed_at + }) + } + None => None, + } + } + Err(e) => { + tracing::error!("SLQX error during fetching mobile hotspot infos. {e}"); + None + } + } + }) + .boxed(), + false => todo!(), // false => sqlx::query_as::<_, GatewayInfoV3>(&DEVICE_TYPES_METADATA_SQL) + // .bind( + // device_types + // .iter() + // // The device_types field has a jsonb type but is being used as a string, + // // which forces us to add quotes. + // .map(|v| format!("\"{}\"", v.to_sql_param())) + // .collect::>(), + // ) + // .fetch(db) + // .filter_map(|gwinfo| async move { gwinfo.ok() }) + // .boxed(), + } } } diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 79eade5ec..20ed7091a 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -4,7 +4,7 @@ use crate::{ db::{get_batch_tracked_radios, get_updated_radios}, DeviceType, GatewayInfo, }, - gateway_info_v3::{self, DeviceTypeV2}, + gateway_info_v3::{self, db::get_mobile_tracker_gateways_info, DeviceTypeV2}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -378,20 +378,31 @@ impl mobile_config::Gateway for GatewayService { .single() .ok_or(Status::invalid_argument( "Invalid min_refreshed_at argument", - )) - .unwrap(); // TODO + ))?; - let _min_location_changed_at = Utc - .timestamp_opt(request.min_location_changed_at as i64, 0) - .single() - .ok_or(Status::invalid_argument( - "Invalid min_location_changed_at argument", - )) - .unwrap(); // TODO; + let min_location_changed_at = if request.min_location_changed_at == 0 { + None + } else { + Some( + Utc.timestamp_opt(request.min_location_changed_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_location_changed_at argument", + ))?, + ) + }; - let _updated_radios = - get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; - let stream = gateway_info_v3::db::all_info_stream_v3(&metadata_db_pool, &device_types); + let mobile_tracker_gateways_info = get_mobile_tracker_gateways_info( + &mobile_config_db_pool, + min_updated_at, + min_location_changed_at, + ) + .await?; + let stream = gateway_info_v3::db::all_info_stream_v3( + &metadata_db_pool, + &device_types, + &mobile_tracker_gateways_info, + ); // let stream = stream // .filter_map(|gateway_info| { // // todo set location and location_changed_at here? From 1cdec8ef830a7c20ff788af983660fedcff23cec Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 15 Jul 2025 14:46:59 +0300 Subject: [PATCH 04/18] partial refactor all_info_stream_v3 --- Cargo.lock | 6 +- mobile_config/src/gateway_info_v3.rs | 241 ++++++++++++++------------- 2 files changed, 125 insertions(+), 122 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8c034272..33f44212f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#89c549665af77ac1fabe4ae1f2b54f1acac2ef73" +source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#bbabbf5cce29d033553a745dd8a7290625877030" dependencies = [ "bytes", "prost", @@ -5859,7 +5859,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -5879,7 +5879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.101", diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index c6c3cd8be..d2b0eaebd 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -1,15 +1,15 @@ use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::{ - // gateway_metadata_v2::DeploymentInfo as DeploymentInfoProto, - // CbrsDeploymentInfo as CbrsDeploymentInfoProto, - // CbrsRadioDeploymentInfo as CbrsRadioDeploymentInfoProto, DeviceType as DeviceTypeProto, - // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, - // GatewayMetadata as GatewayMetadataProto, GatewayMetadataV2 as GatewayMetadataProtoV2, // WifiDeploymentInfo as WifiDeploymentInfoProto, DeploymentInfo as DeploymentInfoProto, DeviceTypeV2 as DeviceTypeProtoV2, GatewayInfoV3 as GatewayInfoProtoV3, + // gateway_metadata_v2::DeploymentInfo as DeploymentInfoProto, + // CbrsDeploymentInfo as CbrsDeploymentInfoProto, + // CbrsRadioDeploymentInfo as CbrsRadioDeploymentInfoProto, DeviceType as DeviceTypeProto, + // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, + GatewayMetadataV3 as GatewayMetadataProtoV3, LocationInfo as LocationInfoProto, }; @@ -23,7 +23,9 @@ pub struct LocationInfo { #[derive(Clone, Debug)] pub struct GatewayMetadataV3 { + // TODO: Should it be optional? pub location_info: LocationInfo, + // TODO: Can it be Proto? pub deployment_info: Option, } @@ -83,20 +85,32 @@ impl From for DeviceTypeV2 { impl TryFrom for GatewayInfoProtoV3 { type Error = hextree::Error; - fn try_from(_info: GatewayInfoV3) -> Result { - todo!() - // let metadata = if let Some(metadata) = info.metadata { - // Some(GatewayMetadataProto { - // location: hextree::Cell::from_raw(metadata.location)?.to_string(), - // }) - // } else { - // None - // }; - // Ok(Self { - // address: info.address.into(), - // metadata, - // device_type: info.device_type as i32, - // }) + fn try_from(info: GatewayInfoV3) -> Result { + let metadata = if let Some(metadata) = info.metadata { + let location_info = LocationInfoProto { + location: hextree::Cell::from_raw(metadata.location_info.location)?.to_string(), + location_changed_at: metadata.location_info.location_changed_at.timestamp() as u64, + }; + let deployment_info = metadata.deployment_info.map(|di| DeploymentInfoProto { + antenna: di.antenna, + elevation: di.elevation, + azimuth: di.azimuth, + }); + + Some(GatewayMetadataProtoV3 { + location_info: Some(location_info), + deployment_info, + }) + } else { + None + }; + Ok(Self { + address: info.address.into(), + metadata, + device_type: info.device_type as i32, + created_at: info.created_at.timestamp() as u64, + updated_at: info.updated_at.timestamp() as u64, + }) } } @@ -112,6 +126,8 @@ pub(crate) mod db { use sqlx::{types::Json, PgExecutor}; use std::{collections::HashMap, str::FromStr, sync::LazyLock}; + use crate::gateway_info::DeploymentInfo; + use super::{DeviceTypeV2, GatewayInfoV3, GatewayMetadataV3}; pub struct MobileTrackerInfo { @@ -204,111 +220,98 @@ pub(crate) mod db { } } + /// Streams all gateway info records, optionally filtering by device types. pub fn all_info_stream_v3<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceTypeV2], mtim: &'a MobileTrackerInfoMap, ) -> impl Stream + 'a { - match device_types.is_empty() { - true => sqlx::query(GET_MOBILE_HOTSPOT_INFO) - .fetch(db) - .filter_map(move |hs_info| async move { - match hs_info { - Ok(info_row) => { - let address = PublicKeyBinary::from_str( - &bs58::encode(info_row.get::<&[u8], &str>("entity_key")) - .into_string(), - ) - .map_err(|err| sqlx::Error::Decode(Box::new(err))) - .unwrap(); // TODO remove unwrap() - - match mtim.get(&address) { - Some(mti) => { - // TODO test location is Some but asserted_location_changed_at - // is None - let location = mti.location; - let metadata = if let Some(loc) = location { - // If location is Some, asserted_location_changed_at must - // also be some. Otherwise, data is corrupted - let asserted_location_changed_at = - mti.asserted_location_changed_at?; - - // TODO function getWifiDeploymentInfo - let deployment_info = match info_row.try_get::, - >, &str>( - "deployment_info" - ) { - Ok(di) => di.map(|v| v.0), - // We shouldn't fail if an error occurs in this case. - // This is because the data in this column could be inconsistent, - // and we don't want to break - Err(_e) => None, - }; - let deployment_info = match deployment_info { - Some(di) => match di { - crate::gateway_info::DeploymentInfo::WifiDeploymentInfo(wdi) => { - Some(DeploymentInfoProto { - antenna: wdi.antenna, - elevation: wdi.elevation, - azimuth: wdi.azimuth, - }) - } - crate::gateway_info::DeploymentInfo::CbrsDeploymentInfo(_cdi) => None, - }, - None => None, - }; - - Some(GatewayMetadataV3 { - location_info: super::LocationInfo { - location: loc, - location_changed_at: asserted_location_changed_at, - }, - deployment_info, - }) - } else { - None - }; - let device_type = DeviceTypeV2::from_str( - info_row.get::, &str>("device_type") - .to_string() - .as_ref(), - ) - .map_err(|err| sqlx::Error::Decode(Box::new(err))).unwrap();// todo remove unwrap - let created_at = info_row.get::, &str>("created_at"); - let refreshed_at = info_row.get::, &str>("refreshed_at"); - - Some(GatewayInfoV3 { - address, - metadata, - device_type, - created_at, - refreshed_at, - updated_at: mti.last_changed_at - }) - } - None => None, - } - } - Err(e) => { - tracing::error!("SLQX error during fetching mobile hotspot infos. {e}"); - None - } + // Choose base query depending on whether filtering is needed. + let query = if device_types.is_empty() { + sqlx::query(GET_MOBILE_HOTSPOT_INFO) + } else { + sqlx::query(&DEVICE_TYPES_METADATA_SQL).bind( + device_types + .iter() + // The device_types field has a jsonb type but is being used as a string, + // which forces us to add quotes. + .map(|v| format!("\"{}\"", v.to_sql_param())) + .collect::>(), + ) + }; + + query + .fetch(db) + .filter_map(move |result| async move { + match result { + Ok(row) => process_row(row, mtim).await, + Err(e) => { + tracing::error!("SQLx fetch error: {e:?}"); + None } - }) - .boxed(), - false => todo!(), // false => sqlx::query_as::<_, GatewayInfoV3>(&DEVICE_TYPES_METADATA_SQL) - // .bind( - // device_types - // .iter() - // // The device_types field has a jsonb type but is being used as a string, - // // which forces us to add quotes. - // .map(|v| format!("\"{}\"", v.to_sql_param())) - // .collect::>(), - // ) - // .fetch(db) - // .filter_map(|gwinfo| async move { gwinfo.ok() }) - // .boxed(), - } + } + }) + .boxed() + } + + /// Processes a single database row into a GatewayInfoV3, returning None if any step fails. + async fn process_row( + row: sqlx::postgres::PgRow, + mtim: &MobileTrackerInfoMap, + ) -> Option { + let device_type = DeviceTypeV2::from_str( + row.get::, &str>("device_type") + .to_string() + .as_ref(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err))) + .unwrap(); // TODO REMOVE + + let address = PublicKeyBinary::from_str( + &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err))) + .unwrap(); // TODO REMOVE + + let mti = mtim.get(&address)?; + + let updated_at = mti.last_changed_at; + + let metadata = mti.location.and_then(|loc| { + let location_changed_at = mti.asserted_location_changed_at?; + // Safely parse deployment_info JSON + let deployment_info = row + .try_get::>, _>("deployment_info") + .ok() + .flatten() + .and_then(|json| match json.0 { + DeploymentInfo::WifiDeploymentInfo(wdi) => Some(DeploymentInfoProto { + antenna: wdi.antenna, + elevation: wdi.elevation, + azimuth: wdi.azimuth, + }), + _ => None, + }); + + Some(GatewayMetadataV3 { + location_info: super::LocationInfo { + location: loc, + location_changed_at, + }, + deployment_info, + }) + }); + + let created_at: DateTime = row.get("created_at"); + let refreshed_at: DateTime = row.get("refreshed_at"); + + Some(GatewayInfoV3 { + address, + metadata, + device_type, + created_at, + refreshed_at, + updated_at, + }) } } From 3a718499ed993b38df35cd14360c690af360d3db Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 15 Jul 2025 15:50:30 +0300 Subject: [PATCH 05/18] Fix bugs, add gateway_stream_info_v3 test --- mobile_config/src/gateway_info_v3.rs | 6 +- mobile_config/tests/gateway_service.rs | 85 ++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index d2b0eaebd..34933bb3d 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -152,7 +152,7 @@ pub(crate) mod db { JOIN key_to_assets kta ON infos.asset = kta.asset WHERE device_type != '"cbrs"' "#; - const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + const DEVICE_TYPES_WHERE_SNIPPET: &str = " AND device_type::text = any($1) "; static DEVICE_TYPES_METADATA_SQL: LazyLock = LazyLock::new(|| format!("{GET_MOBILE_HOTSPOT_INFO} {DEVICE_TYPES_WHERE_SNIPPET}")); @@ -175,7 +175,7 @@ pub(crate) mod db { let entity_key = bs58::encode(entity_key_b).into_string(); let last_changed_at = row.get::, &str>("last_changed_at"); let asserted_location_changed_at = - row.get::>, &str>("asserted_location_changed_at "); + row.get::>, &str>("asserted_location_changed_at"); let asserted_location = row.get::("asserted_location"); map.insert( @@ -202,7 +202,7 @@ pub(crate) mod db { let entity_key = bs58::encode(entity_key_b).into_string(); let last_changed_at = row.get::, &str>("last_changed_at"); let asserted_location_changed_at = - row.get::>, &str>("asserted_location_changed_at "); + row.get::>, &str>("asserted_location_changed_at"); let asserted_location = row.get::, &str>("asserted_location"); map.insert( diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 1db74af0c..c2e108538 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,12 +1,11 @@ -use std::vec; - use chrono::{Duration, Utc}; use futures::stream::StreamExt; +use std::vec; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ - self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, GatewayClient, - GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV2, + self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, DeviceTypeV2, GatewayClient, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamReqV3, GatewayInfoStreamResV2, }; use mobile_config::{ gateway_service::GatewayService, @@ -221,6 +220,63 @@ async fn gateway_stream_info_v2(pool: PgPool) { ); } +#[sqlx::test] +async fn gateway_stream_info_v3(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now).await; + + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_plus_10, + Some(now_plus_10), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // Select all devices + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // Filter by device type + let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceTypeV2::Indoor) + ); +} + #[sqlx::test] async fn gateway_stream_info_v2_updated_at(pool: PgPool) { let admin_key = make_keypair(); @@ -803,6 +859,27 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { } } +fn make_gateway_stream_signed_req_v3( + signer: &Keypair, + device_types: &[DeviceTypeV2], + min_updated_at: u64, +) -> proto::GatewayInfoStreamReqV3 { + let mut req = GatewayInfoStreamReqV3 { + batch_size: 10000, + signer: signer.public_key().to_vec(), + signature: vec![], + device_types: device_types + .iter() + .map(|v| DeviceTypeV2::into(*v)) + .collect(), + min_updated_at, + min_location_changed_at: 0, // TODO testme + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} + fn make_gateway_stream_signed_req_v2( signer: &Keypair, device_types: &[DeviceType], From 98107c5e376348faa4f728e43f5c96277cb5749b Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 13:47:47 +0300 Subject: [PATCH 06/18] Add updated_at and location_changed_at tests --- mobile_config/tests/common/mod.rs | 8 +- mobile_config/tests/gateway_service.rs | 200 +++++++++++++++++++++++-- 2 files changed, 195 insertions(+), 13 deletions(-) diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 69217de7b..27397d043 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -8,21 +8,25 @@ pub async fn add_mobile_tracker_record( pool: &PgPool, key: PublicKeyBinary, last_changed_at: DateTime, + asserted_location: Option, + asserted_location_changed_at: Option>, ) { let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); sqlx::query( r#" INSERT INTO -"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") +"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at", "asserted_location", "asserted_location_changed_at") VALUES -($1, $2, $3, $4); +($1, $2, $3, $4, $5, $6); "#, ) .bind(b58) .bind("hash") .bind(last_changed_at) .bind(last_changed_at + Duration::hours(1)) + .bind(asserted_location) + .bind(asserted_location_changed_at) .execute(pool) .await .unwrap(); diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index c2e108538..b36d39217 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -242,7 +242,7 @@ async fn gateway_stream_info_v3(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now).await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now, None, None).await; add_db_record( &pool, @@ -255,19 +255,19 @@ async fn gateway_stream_info_v3(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10).await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // Select all devices - let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0); + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 2); // Filter by device type - let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0); + let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0, 0); let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); @@ -299,7 +299,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at, None, None).await; // Shouldn't be returned add_db_record( @@ -313,7 +313,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -337,6 +337,183 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { assert!(stream.next().await.is_none()); } +#[sqlx::test] +async fn gateway_stream_info_v3_updated_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + updated_at, + Some(asset1_hex_idx), + Some(updated_at), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], updated_at.timestamp() as u64, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::Indoor + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset1_hex_idx + ); + assert!(stream.next().await.is_none()); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_minus_six = now - Duration::hours(6); + let now_minus_three = now - Duration::hours(3); + let now_minus_four = now - Duration::hours(4); + let now_minus_five = now - Duration::hours(5); + + // Scenario: + // asset_1 location changed at now - 6 hours + // asset_2 location changed at now - 4 hours + // request min_location_changed_at location changed at now - 5 hours + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_minus_three, + Some(asset1_hex_idx), + Some(now_minus_six), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset2_pubkey.clone().into(), + now_minus_three, + Some(asset2_hex_idx), + Some(now_minus_four), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_five.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::DataOnly + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset2_hex_idx + ); + assert!(stream.next().await.is_none()); + + // Change min_location_changed_at parameter, now two radios should be returned + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_six.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); +} + #[sqlx::test] async fn gateway_info_batch_v2(pool: PgPool) { let admin_key = make_keypair(); @@ -371,7 +548,7 @@ async fn gateway_info_batch_v2(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -464,7 +641,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at, None, None).await; // Must be ignored since not included in req add_db_record( @@ -602,7 +779,7 @@ async fn gateway_info_v2(pool: PgPool) { Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -696,7 +873,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -863,6 +1040,7 @@ fn make_gateway_stream_signed_req_v3( signer: &Keypair, device_types: &[DeviceTypeV2], min_updated_at: u64, + min_location_changed_at: u64, ) -> proto::GatewayInfoStreamReqV3 { let mut req = GatewayInfoStreamReqV3 { batch_size: 10000, @@ -873,7 +1051,7 @@ fn make_gateway_stream_signed_req_v3( .map(|v| DeviceTypeV2::into(*v)) .collect(), min_updated_at, - min_location_changed_at: 0, // TODO testme + min_location_changed_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); From 68559bb9d21ef2398736dc58ef7396a6722a8014 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 14:19:57 +0300 Subject: [PATCH 07/18] Move tests to gateway_service_v3 --- mobile_config/src/gateway_info_v3.rs | 1 - mobile_config/tests/common/mod.rs | 34 +++ mobile_config/tests/gateway_service.rs | 284 +--------------------- mobile_config/tests/gateway_service_v3.rs | 269 ++++++++++++++++++++ 4 files changed, 305 insertions(+), 283 deletions(-) create mode 100644 mobile_config/tests/gateway_service_v3.rs diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index 34933bb3d..f3754f60e 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -23,7 +23,6 @@ pub struct LocationInfo { #[derive(Clone, Debug)] pub struct GatewayMetadataV3 { - // TODO: Should it be optional? pub location_info: LocationInfo, // TODO: Can it be Proto? pub deployment_info: Option, diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 27397d043..198e2808f 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -137,3 +137,37 @@ pub async fn create_db_tables(pool: &PgPool) { pub fn make_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) } + +use helium_crypto::PublicKey; +use helium_proto::services::mobile_config::{self as proto}; +use mobile_config::{ + gateway_service::GatewayService, + key_cache::{CacheKeys, KeyCache}, + KeyRole, +}; +use tokio::net::TcpListener; +use tonic::transport; + +pub async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle) +} diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index b36d39217..d5cfcb4ae 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -4,8 +4,8 @@ use std::vec; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ - self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, DeviceTypeV2, GatewayClient, - GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamReqV3, GatewayInfoStreamResV2, + self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, GatewayClient, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV2, }; use mobile_config::{ gateway_service::GatewayService, @@ -88,30 +88,6 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -async fn spawn_gateway_service( - pool: PgPool, - admin_pub_key: PublicKey, -) -> ( - String, - tokio::task::JoinHandle>, -) { - let server_key = make_keypair(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); - let handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - - (format!("http://{addr}"), handle) -} - #[sqlx::test] async fn gateway_stream_info_v1(pool: PgPool) { let admin_key = make_keypair(); @@ -220,63 +196,6 @@ async fn gateway_stream_info_v2(pool: PgPool) { ); } -#[sqlx::test] -async fn gateway_stream_info_v3(pool: PgPool) { - let admin_key = make_keypair(); - let asset1_pubkey = make_keypair().public_key().clone(); - let asset1_hex_idx = 631711281837647359_i64; - let asset2_hex_idx = 631711286145955327_i64; - let asset2_pubkey = make_keypair().public_key().clone(); - let now = Utc::now(); - let now_plus_10 = now + chrono::Duration::seconds(10); - - create_db_tables(&pool).await; - add_db_record( - &pool, - "asset1", - Some(asset1_hex_idx), - "\"wifiIndoor\"", - asset1_pubkey.clone().into(), - now, - Some(now), - None, - ) - .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now, None, None).await; - - add_db_record( - &pool, - "asset2", - Some(asset2_hex_idx), - "\"wifiDataOnly\"", - asset2_pubkey.clone().into(), - now_plus_10, - Some(now_plus_10), - None, - ) - .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10, None, None).await; - - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - let mut client = GatewayClient::connect(addr).await.unwrap(); - - // Select all devices - let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); - let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 2); - - // Filter by device type - let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0, 0); - let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 1); - assert_eq!( - resp.gateways.first().unwrap().device_type, - Into::::into(DeviceTypeV2::Indoor) - ); -} - #[sqlx::test] async fn gateway_stream_info_v2_updated_at(pool: PgPool) { let admin_key = make_keypair(); @@ -337,183 +256,6 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { assert!(stream.next().await.is_none()); } -#[sqlx::test] -async fn gateway_stream_info_v3_updated_at(pool: PgPool) { - let admin_key = make_keypair(); - let asset1_pubkey = make_keypair().public_key().clone(); - let asset1_hex_idx = 631711281837647359_i64; - let asset2_hex_idx = 631711286145955327_i64; - let asset2_pubkey = make_keypair().public_key().clone(); - let created_at = Utc::now() - Duration::hours(5); - let updated_at = Utc::now() - Duration::hours(3); - - create_db_tables(&pool).await; - add_db_record( - &pool, - "asset1", - Some(asset1_hex_idx), - "\"wifiIndoor\"", - asset1_pubkey.clone().into(), - created_at, - Some(updated_at), - None, - ) - .await; - add_mobile_tracker_record( - &pool, - asset1_pubkey.clone().into(), - updated_at, - Some(asset1_hex_idx), - Some(updated_at), - ) - .await; - - // Shouldn't be returned - add_db_record( - &pool, - "asset2", - Some(asset2_hex_idx), - "\"wifiDataOnly\"", - asset2_pubkey.clone().into(), - created_at, - None, - None, - ) - .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; - - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - let mut client = GatewayClient::connect(addr).await.unwrap(); - - let req = make_gateway_stream_signed_req_v3(&admin_key, &[], updated_at.timestamp() as u64, 0); - let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 1); - - let gw_info = resp.gateways.first().unwrap(); - let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); - assert_eq!(pub_key, asset1_pubkey.clone()); - assert_eq!( - DeviceTypeV2::try_from(gw_info.device_type).unwrap(), - DeviceTypeV2::Indoor - ); - assert_eq!( - i64::from_str_radix( - &gw_info - .metadata - .clone() - .unwrap() - .location_info - .unwrap() - .location, - 16 - ) - .unwrap(), - asset1_hex_idx - ); - assert!(stream.next().await.is_none()); -} - -#[sqlx::test] -async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) { - let admin_key = make_keypair(); - let asset1_pubkey = make_keypair().public_key().clone(); - let asset1_hex_idx = 631711281837647359_i64; - let asset2_hex_idx = 631711286145955327_i64; - let asset2_pubkey = make_keypair().public_key().clone(); - let now = Utc::now(); - let now_minus_six = now - Duration::hours(6); - let now_minus_three = now - Duration::hours(3); - let now_minus_four = now - Duration::hours(4); - let now_minus_five = now - Duration::hours(5); - - // Scenario: - // asset_1 location changed at now - 6 hours - // asset_2 location changed at now - 4 hours - // request min_location_changed_at location changed at now - 5 hours - - create_db_tables(&pool).await; - add_db_record( - &pool, - "asset1", - Some(asset1_hex_idx), - "\"wifiIndoor\"", - asset1_pubkey.clone().into(), - now_minus_six, - Some(now), - None, - ) - .await; - add_mobile_tracker_record( - &pool, - asset1_pubkey.clone().into(), - now_minus_three, - Some(asset1_hex_idx), - Some(now_minus_six), - ) - .await; - - // Shouldn't be returned - add_db_record( - &pool, - "asset2", - Some(asset2_hex_idx), - "\"wifiDataOnly\"", - asset2_pubkey.clone().into(), - now_minus_six, - Some(now), - None, - ) - .await; - add_mobile_tracker_record( - &pool, - asset2_pubkey.clone().into(), - now_minus_three, - Some(asset2_hex_idx), - Some(now_minus_four), - ) - .await; - - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - let mut client = GatewayClient::connect(addr).await.unwrap(); - - let req = - make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_five.timestamp() as u64); - let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 1); - - let gw_info = resp.gateways.first().unwrap(); - let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); - assert_eq!(pub_key, asset2_pubkey.clone()); - assert_eq!( - DeviceTypeV2::try_from(gw_info.device_type).unwrap(), - DeviceTypeV2::DataOnly - ); - assert_eq!( - i64::from_str_radix( - &gw_info - .metadata - .clone() - .unwrap() - .location_info - .unwrap() - .location, - 16 - ) - .unwrap(), - asset2_hex_idx - ); - assert!(stream.next().await.is_none()); - - // Change min_location_changed_at parameter, now two radios should be returned - let req = - make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_six.timestamp() as u64); - let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 2); -} - #[sqlx::test] async fn gateway_info_batch_v2(pool: PgPool) { let admin_key = make_keypair(); @@ -1036,28 +778,6 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { } } -fn make_gateway_stream_signed_req_v3( - signer: &Keypair, - device_types: &[DeviceTypeV2], - min_updated_at: u64, - min_location_changed_at: u64, -) -> proto::GatewayInfoStreamReqV3 { - let mut req = GatewayInfoStreamReqV3 { - batch_size: 10000, - signer: signer.public_key().to_vec(), - signature: vec![], - device_types: device_types - .iter() - .map(|v| DeviceTypeV2::into(*v)) - .collect(), - min_updated_at, - min_location_changed_at, - }; - - req.signature = signer.sign(&req.encode_to_vec()).unwrap(); - req -} - fn make_gateway_stream_signed_req_v2( signer: &Keypair, device_types: &[DeviceType], diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs new file mode 100644 index 000000000..653cd9b02 --- /dev/null +++ b/mobile_config/tests/gateway_service_v3.rs @@ -0,0 +1,269 @@ +use chrono::{Duration, Utc}; +use futures::stream::StreamExt; +use std::vec; + +use helium_crypto::{Keypair, PublicKey, Sign}; +use helium_proto::services::mobile_config::{ + self as proto, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, +}; +use prost::Message; +use sqlx::PgPool; + +pub mod common; +use common::*; + +#[sqlx::test] +async fn gateway_stream_info_v3(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now, None, None).await; + + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_plus_10, + Some(now_plus_10), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // Select all devices + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // Filter by device type + let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceTypeV2::Indoor) + ); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_updated_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + updated_at, + Some(asset1_hex_idx), + Some(updated_at), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], updated_at.timestamp() as u64, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::Indoor + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset1_hex_idx + ); + assert!(stream.next().await.is_none()); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_minus_six = now - Duration::hours(6); + let now_minus_three = now - Duration::hours(3); + let now_minus_four = now - Duration::hours(4); + let now_minus_five = now - Duration::hours(5); + + // Scenario: + // asset_1 location changed at now - 6 hours + // asset_2 location changed at now - 4 hours + // request min_location_changed_at location changed at now - 5 hours + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_minus_three, + Some(asset1_hex_idx), + Some(now_minus_six), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset2_pubkey.clone().into(), + now_minus_three, + Some(asset2_hex_idx), + Some(now_minus_four), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_five.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::DataOnly + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset2_hex_idx + ); + assert!(stream.next().await.is_none()); + + // Change min_location_changed_at parameter, now two radios should be returned + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_six.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); +} + +fn make_gateway_stream_signed_req_v3( + signer: &Keypair, + device_types: &[DeviceTypeV2], + min_updated_at: u64, + min_location_changed_at: u64, +) -> proto::GatewayInfoStreamReqV3 { + let mut req = GatewayInfoStreamReqV3 { + batch_size: 10000, + signer: signer.public_key().to_vec(), + signature: vec![], + device_types: device_types + .iter() + .map(|v| DeviceTypeV2::into(*v)) + .collect(), + min_updated_at, + min_location_changed_at, + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} From 1eab6177a5b055562c60972b3c8b9769f074a892 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 14:59:29 +0300 Subject: [PATCH 08/18] Check LocationInfo and DeploymentInfo in gateway_stream_info_v3_basic --- mobile_config/tests/gateway_service_v3.rs | 37 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs index 653cd9b02..fdd6acae1 100644 --- a/mobile_config/tests/gateway_service_v3.rs +++ b/mobile_config/tests/gateway_service_v3.rs @@ -4,7 +4,8 @@ use std::vec; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ - self as proto, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, + self as proto, DeploymentInfo, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, + GatewayInfoV3, LocationInfo, }; use prost::Message; use sqlx::PgPool; @@ -13,7 +14,7 @@ pub mod common; use common::*; #[sqlx::test] -async fn gateway_stream_info_v3(pool: PgPool) { +async fn gateway_stream_info_v3_basic(pool: PgPool) { let admin_key = make_keypair(); let asset1_pubkey = make_keypair().public_key().clone(); let asset1_hex_idx = 631711281837647359_i64; @@ -21,6 +22,7 @@ async fn gateway_stream_info_v3(pool: PgPool) { let asset2_pubkey = make_keypair().public_key().clone(); let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); + let now_plus_5 = now + chrono::Duration::seconds(5); create_db_tables(&pool).await; add_db_record( @@ -31,10 +33,17 @@ async fn gateway_stream_info_v3(pool: PgPool) { asset1_pubkey.clone().into(), now, Some(now), - None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_plus_10, + Some(asset1_hex_idx), + Some(now_plus_5), ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now, None, None).await; add_db_record( &pool, @@ -63,9 +72,25 @@ async fn gateway_stream_info_v3(pool: PgPool) { let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert_eq!( + gateway.metadata.clone().unwrap().location_info.unwrap(), + LocationInfo { + location: format!("{:x}", asset1_hex_idx), + location_changed_at: now_plus_5.timestamp() as u64 + } + ); assert_eq!( - resp.gateways.first().unwrap().device_type, - Into::::into(DeviceTypeV2::Indoor) + gateway.metadata.clone().unwrap().deployment_info.unwrap(), + DeploymentInfo { + antenna: 18, + elevation: 2, + azimuth: 161, + } ); } From 14054d1146c7ee97f1879baa9488053a801c5984 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 15:12:32 +0300 Subject: [PATCH 09/18] Add gateway_stream_info_v3_no_deployment_info testcase --- mobile_config/src/gateway_info_v3.rs | 1 - mobile_config/tests/gateway_service_v3.rs | 53 +++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index f3754f60e..3a076fa13 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -24,7 +24,6 @@ pub struct LocationInfo { #[derive(Clone, Debug)] pub struct GatewayMetadataV3 { pub location_info: LocationInfo, - // TODO: Can it be Proto? pub deployment_info: Option, } diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs index fdd6acae1..0abd2c007 100644 --- a/mobile_config/tests/gateway_service_v3.rs +++ b/mobile_config/tests/gateway_service_v3.rs @@ -94,6 +94,59 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) { ); } +#[sqlx::test] +async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) { + // There is location info but no deployment info + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + let now_plus_5 = now + chrono::Duration::seconds(5); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_plus_10, + Some(asset1_hex_idx), + Some(now_plus_5), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert_eq!( + gateway.metadata.clone().unwrap().location_info.unwrap(), + LocationInfo { + location: format!("{:x}", asset1_hex_idx), + location_changed_at: now_plus_5.timestamp() as u64 + } + ); + assert!(gateway.metadata.clone().unwrap().deployment_info.is_none(),); +} + #[sqlx::test] async fn gateway_stream_info_v3_updated_at(pool: PgPool) { let admin_key = make_keypair(); From 134f4ea0d050eeb0fd65788c8beb82bd379ec6bc Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 16:18:14 +0300 Subject: [PATCH 10/18] Add gateway_stream_info_v3_min_location_changed_at_zero test --- mobile_config/tests/gateway_service_v3.rs | 76 +++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs index 0abd2c007..f6d4abe92 100644 --- a/mobile_config/tests/gateway_service_v3.rs +++ b/mobile_config/tests/gateway_service_v3.rs @@ -224,6 +224,82 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) { assert!(stream.next().await.is_none()); } +#[sqlx::test] +async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) { + // asset_1 has no location + // asset_2 has location + // Make sure if min_location_changed_at == 0 then returned both radios + // if min_location_changed_at >= 1 then radios with null location filtered out + + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_minus_six = now - Duration::hours(6); + let now_minus_three = now - Duration::hours(3); + let now_minus_four = now - Duration::hours(4); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + None, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_minus_three, + None, + None, + ) + .await; + + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset2_pubkey.clone().into(), + now_minus_three, + Some(asset2_hex_idx), + Some(now_minus_four), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // min_location_changed_at = 1 + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 1); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); +} + #[sqlx::test] async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) { let admin_key = make_keypair(); From 925247395dc2cb401794a65390c0044a3c13a8f6 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 16:20:20 +0300 Subject: [PATCH 11/18] refactor get_mobile_tracker_gateways_info --- mobile_config/src/gateway_info_v3.rs | 84 ++++++++++------------------ 1 file changed, 30 insertions(+), 54 deletions(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index 3a076fa13..fc22c9b70 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -159,63 +159,39 @@ pub(crate) mod db { min_updated_at: DateTime, min_location_changed_at: Option>, ) -> anyhow::Result { - // TODO refactor - if let Some(min_loc_changed_at) = min_location_changed_at { + let query = if let Some(min_loc) = min_location_changed_at { sqlx::query(&GET_UPDATED_RADIOS_WITH_LOCATION) .bind(min_updated_at) - .bind(min_loc_changed_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_fold( - MobileTrackerInfoMap::new(), - |mut map: MobileTrackerInfoMap, row| async move { - let entity_key_b = row.get::<&[u8], &str>("entity_key"); - let entity_key = bs58::encode(entity_key_b).into_string(); - let last_changed_at = row.get::, &str>("last_changed_at"); - let asserted_location_changed_at = - row.get::>, &str>("asserted_location_changed_at"); - let asserted_location = row.get::("asserted_location"); - - map.insert( - PublicKeyBinary::from_str(&entity_key)?, - MobileTrackerInfo { - location: Some(asserted_location as u64), - last_changed_at, - asserted_location_changed_at, - }, - ); - Ok(map) - }, - ) - .await + .bind(min_loc) } else { - sqlx::query(GET_UPDATED_RADIOS) - .bind(min_updated_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_fold( - MobileTrackerInfoMap::new(), - |mut map: MobileTrackerInfoMap, row| async move { - let entity_key_b = row.get::<&[u8], &str>("entity_key"); - let entity_key = bs58::encode(entity_key_b).into_string(); - let last_changed_at = row.get::, &str>("last_changed_at"); - let asserted_location_changed_at = - row.get::>, &str>("asserted_location_changed_at"); - let asserted_location = row.get::, &str>("asserted_location"); - - map.insert( - PublicKeyBinary::from_str(&entity_key)?, - MobileTrackerInfo { - location: asserted_location.map(|v| v as u64), - last_changed_at, - asserted_location_changed_at, - }, - ); - Ok(map) - }, - ) - .await - } + sqlx::query(GET_UPDATED_RADIOS).bind(min_updated_at) + }; + + query + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + MobileTrackerInfoMap::new(), + |mut map: MobileTrackerInfoMap, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let last_changed_at = row.get::, &str>("last_changed_at"); + let asserted_location_changed_at = + row.get::>, &str>("asserted_location_changed_at"); + let asserted_location = row.get::, &str>("asserted_location"); + + map.insert( + PublicKeyBinary::from_str(&entity_key)?, + MobileTrackerInfo { + location: asserted_location.map(|v| v as u64), + last_changed_at, + asserted_location_changed_at, + }, + ); + Ok(map) + }, + ) + .await } /// Streams all gateway info records, optionally filtering by device types. From 7a61a7d47926e112887f91c0d72d03ef8f3946a1 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 16:25:07 +0300 Subject: [PATCH 12/18] Refactor --- mobile_config/src/gateway_info_v3.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index fc22c9b70..c6aa0025f 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -1,15 +1,8 @@ use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::{ - // WifiDeploymentInfo as WifiDeploymentInfoProto, - DeploymentInfo as DeploymentInfoProto, - DeviceTypeV2 as DeviceTypeProtoV2, - GatewayInfoV3 as GatewayInfoProtoV3, - // gateway_metadata_v2::DeploymentInfo as DeploymentInfoProto, - // CbrsDeploymentInfo as CbrsDeploymentInfoProto, - // CbrsRadioDeploymentInfo as CbrsRadioDeploymentInfoProto, DeviceType as DeviceTypeProto, - // GatewayInfo as GatewayInfoProto, GatewayInfoV2 as GatewayInfoProtoV2, - GatewayMetadataV3 as GatewayMetadataProtoV3, + DeploymentInfo as DeploymentInfoProto, DeviceTypeV2 as DeviceTypeProtoV2, + GatewayInfoV3 as GatewayInfoProtoV3, GatewayMetadataV3 as GatewayMetadataProtoV3, LocationInfo as LocationInfoProto, }; @@ -194,7 +187,7 @@ pub(crate) mod db { .await } - /// Streams all gateway info records, optionally filtering by device types. + // Streams all gateway info records, optionally filtering by device types. pub fn all_info_stream_v3<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceTypeV2], @@ -228,7 +221,7 @@ pub(crate) mod db { .boxed() } - /// Processes a single database row into a GatewayInfoV3, returning None if any step fails. + // Processes a single database row into a GatewayInfoV3, returning None if any step fails. async fn process_row( row: sqlx::postgres::PgRow, mtim: &MobileTrackerInfoMap, @@ -239,13 +232,13 @@ pub(crate) mod db { .as_ref(), ) .map_err(|err| sqlx::Error::Decode(Box::new(err))) - .unwrap(); // TODO REMOVE + .ok()?; let address = PublicKeyBinary::from_str( &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), ) .map_err(|err| sqlx::Error::Decode(Box::new(err))) - .unwrap(); // TODO REMOVE + .ok()?; let mti = mtim.get(&address)?; @@ -253,7 +246,6 @@ pub(crate) mod db { let metadata = mti.location.and_then(|loc| { let location_changed_at = mti.asserted_location_changed_at?; - // Safely parse deployment_info JSON let deployment_info = row .try_get::>, _>("deployment_info") .ok() From e6334dab893bb75626421a61a1ec8c64a4d9cf52 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 16:36:56 +0300 Subject: [PATCH 13/18] Remove comented code --- mobile_config/src/gateway_service.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 20ed7091a..e285823eb 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -403,16 +403,6 @@ impl mobile_config::Gateway for GatewayService { &device_types, &mobile_tracker_gateways_info, ); - // let stream = stream - // .filter_map(|gateway_info| { - // // todo set location and location_changed_at here? - // future::ready(handle_updated_at( - // gateway_info, - // &updated_radios, - // min_updated_at, - // )) - // }) - // .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); From 7793af1fa9eeaa549dd380ff402c2c105ec59285 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 16 Jul 2025 21:48:14 +0300 Subject: [PATCH 14/18] Remove TODO (tested) --- mobile_config/src/gateway_info_v3.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index c6aa0025f..a6c1721fc 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -128,7 +128,6 @@ pub(crate) mod db { } pub type MobileTrackerInfoMap = HashMap; - // TODO test and add indexes if needed const GET_UPDATED_RADIOS: &str = "SELECT entity_key, last_changed_at, asserted_location, asserted_location_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; From 1b8fde076b76663e27bc8ded2065dd658f9b4b64 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 18 Jul 2025 18:49:56 +0300 Subject: [PATCH 15/18] Add gateway_stream_info_v3_no_metadata test --- Cargo.lock | 6 ++-- mobile_config/tests/gateway_service_v3.rs | 37 +++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33f44212f..289fb27b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#bbabbf5cce29d033553a745dd8a7290625877030" +source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#fb692c19ea4a2d10b205e7a2ae511f098e7c11ca" dependencies = [ "bytes", "prost", @@ -5859,7 +5859,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -5879,7 +5879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.101", diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs index f6d4abe92..ce14c8552 100644 --- a/mobile_config/tests/gateway_service_v3.rs +++ b/mobile_config/tests/gateway_service_v3.rs @@ -94,6 +94,43 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) { ); } +#[sqlx::test] +async fn gateway_stream_info_v3_no_metadata(pool: PgPool) { + // There is location info but no deployment info + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + None, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now_plus_10, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert!(gateway.metadata.is_none()); +} + #[sqlx::test] async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) { // There is location info but no deployment info From fdbae2939237d7c73d49604e3798641fe12a431c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 18 Jul 2025 19:04:34 +0300 Subject: [PATCH 16/18] Remove useless map_err --- mobile_config/src/gateway_info_v3.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs index a6c1721fc..2211c902f 100644 --- a/mobile_config/src/gateway_info_v3.rs +++ b/mobile_config/src/gateway_info_v3.rs @@ -230,13 +230,11 @@ pub(crate) mod db { .to_string() .as_ref(), ) - .map_err(|err| sqlx::Error::Decode(Box::new(err))) .ok()?; let address = PublicKeyBinary::from_str( &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), ) - .map_err(|err| sqlx::Error::Decode(Box::new(err))) .ok()?; let mti = mtim.get(&address)?; From 01b5ca0e507a5155ecc62c7ce07b4b7ac17c8bc1 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 18 Jul 2025 20:04:32 +0300 Subject: [PATCH 17/18] use device_type method --- mobile_config/tests/gateway_service_v3.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs index ce14c8552..920d6b9b9 100644 --- a/mobile_config/tests/gateway_service_v3.rs +++ b/mobile_config/tests/gateway_service_v3.rs @@ -73,7 +73,7 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) { let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); - assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); assert_eq!(gateway.address, asset1_pubkey.to_vec()); assert_eq!(gateway.created_at, now.timestamp() as u64); assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); @@ -124,7 +124,7 @@ async fn gateway_stream_info_v3_no_metadata(pool: PgPool) { let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); - assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); assert_eq!(gateway.address, asset1_pubkey.to_vec()); assert_eq!(gateway.created_at, now.timestamp() as u64); assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); @@ -170,7 +170,7 @@ async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) { let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); - assert_eq!(gateway.device_type, Into::::into(DeviceTypeV2::Indoor)); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); assert_eq!(gateway.address, asset1_pubkey.to_vec()); assert_eq!(gateway.created_at, now.timestamp() as u64); assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); From 9fd034e1316e0ecd5c229f100aec5290f3ba56e7 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 25 Jul 2025 09:37:34 -0400 Subject: [PATCH 18/18] Update query to only pull one record per entity_key in mobile-config (#1021) --- mobile_config/src/mobile_radio_tracker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 042040f4c..9cd5c6021 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -227,7 +227,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream( r#" SELECT - DISTINCT ON (kta.entity_key, mhi.asset) + DISTINCT ON (kta.entity_key) kta.entity_key, mhi.asset, mhi.refreshed_at, @@ -244,7 +244,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream