diff --git a/Cargo.lock b/Cargo.lock index 1bb1d3bee..289fb27b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#2ebbdb1772ffed9fcde2b6a7de5aa7b2afeca1a6" +source = "git+https://www.github.com/helium/proto.git?branch=mobile-config-loc-assert#fb692c19ea4a2d10b205e7a2ae511f098e7c11ca" dependencies = [ "bytes", "prost", @@ -11221,7 +11221,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 79cc88793..691ff1c4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,5 +131,5 @@ anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madnin # helium-proto = { path = "../proto" } # beacon = { path = "../proto/beacon" } -# [patch.'https://github.com/helium/proto'] -# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "bbalser/deprecate-radio-reward-v1" } +[patch.'https://github.com/helium/proto'] +helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "mobile-config-loc-assert" } diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 64990365f..42e04b0a2 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -92,11 +92,13 @@ impl_msg_verify!(mobile_config::CarrierKeyToEntityResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamReqV2, signature); +impl_msg_verify!(mobile_config::GatewayInfoStreamReqV3, signature); impl_msg_verify!(mobile_config::GatewayInfoResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoResV2, signature); impl_msg_verify!(mobile_config::GatewayInfoBatchReqV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoStreamResV2, signature); +impl_msg_verify!(mobile_config::GatewayInfoStreamResV3, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); diff --git a/mobile_config/src/gateway_info_v3.rs b/mobile_config/src/gateway_info_v3.rs new file mode 100644 index 000000000..2211c902f --- /dev/null +++ b/mobile_config/src/gateway_info_v3.rs @@ -0,0 +1,280 @@ +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::mobile_config::{ + DeploymentInfo as DeploymentInfoProto, DeviceTypeV2 as DeviceTypeProtoV2, + GatewayInfoV3 as GatewayInfoProtoV3, GatewayMetadataV3 as GatewayMetadataProtoV3, + LocationInfo as LocationInfoProto, +}; + +use crate::gateway_info::DeviceTypeParseError; + +#[derive(Clone, Debug)] +pub struct LocationInfo { + pub location: u64, + pub location_changed_at: DateTime, +} + +#[derive(Clone, Debug)] +pub struct GatewayMetadataV3 { + pub location_info: LocationInfo, + pub deployment_info: Option, +} + +#[derive(Clone, Debug)] +pub enum DeviceTypeV2 { + Indoor, + Outdoor, + DataOnly, +} + +impl std::str::FromStr for DeviceTypeV2 { + type Err = DeviceTypeParseError; + + fn from_str(s: &str) -> Result { + let result = match s { + "wifiIndoor" => Self::Indoor, + "wifiOutdoor" => Self::Outdoor, + "wifiDataOnly" => Self::DataOnly, + _ => return Err(DeviceTypeParseError), + }; + Ok(result) + } +} + +impl DeviceTypeV2 { + fn to_sql_param(&self) -> &'static str { + match self { + DeviceTypeV2::Indoor => "wifiIndoor", + DeviceTypeV2::Outdoor => "wifiOutdoor", + DeviceTypeV2::DataOnly => "wifiDataOnly", + } + } +} + +#[derive(Clone, Debug)] +pub struct GatewayInfoV3 { + pub address: PublicKeyBinary, + pub metadata: Option, + pub device_type: DeviceTypeV2, + pub created_at: DateTime, + // updated_at refers to the last time the data was actually changed. + pub updated_at: DateTime, + // refreshed_at indicates the last time the chain was consulted, regardless of data changes. + pub refreshed_at: DateTime, +} + +impl From for DeviceTypeV2 { + fn from(value: DeviceTypeProtoV2) -> Self { + match value { + DeviceTypeProtoV2::Indoor => DeviceTypeV2::Indoor, + DeviceTypeProtoV2::Outdoor => DeviceTypeV2::Outdoor, + DeviceTypeProtoV2::DataOnly => DeviceTypeV2::DataOnly, + } + } +} + +impl TryFrom for GatewayInfoProtoV3 { + type Error = hextree::Error; + + fn try_from(info: GatewayInfoV3) -> Result { + let metadata = if let Some(metadata) = info.metadata { + let location_info = LocationInfoProto { + location: hextree::Cell::from_raw(metadata.location_info.location)?.to_string(), + location_changed_at: metadata.location_info.location_changed_at.timestamp() as u64, + }; + let deployment_info = metadata.deployment_info.map(|di| DeploymentInfoProto { + antenna: di.antenna, + elevation: di.elevation, + azimuth: di.azimuth, + }); + + Some(GatewayMetadataProtoV3 { + location_info: Some(location_info), + deployment_info, + }) + } else { + None + }; + Ok(Self { + address: info.address.into(), + metadata, + device_type: info.device_type as i32, + created_at: info.created_at.timestamp() as u64, + updated_at: info.updated_at.timestamp() as u64, + }) + } +} + +pub(crate) mod db { + use chrono::{DateTime, Utc}; + use futures::{ + stream::{Stream, StreamExt}, + TryStreamExt, + }; + use helium_crypto::PublicKeyBinary; + use helium_proto::services::mobile_config::DeploymentInfo as DeploymentInfoProto; + use sqlx::Row; + use sqlx::{types::Json, PgExecutor}; + use std::{collections::HashMap, str::FromStr, sync::LazyLock}; + + use crate::gateway_info::DeploymentInfo; + + use super::{DeviceTypeV2, GatewayInfoV3, GatewayMetadataV3}; + + pub struct MobileTrackerInfo { + location: Option, + last_changed_at: DateTime, + asserted_location_changed_at: Option>, + } + pub type MobileTrackerInfoMap = HashMap; + + const GET_UPDATED_RADIOS: &str = + "SELECT entity_key, last_changed_at, asserted_location, asserted_location_changed_at + FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + + static GET_UPDATED_RADIOS_WITH_LOCATION: LazyLock = LazyLock::new(|| { + format!("{GET_UPDATED_RADIOS} AND asserted_location IS NOT NULL AND asserted_location_changed_at >= $2") + }); + + const GET_MOBILE_HOTSPOT_INFO: &str = r#" + SELECT kta.entity_key, infos.device_type, infos.refreshed_at, infos.created_at, infos.deployment_info + FROM mobile_hotspot_infos infos + JOIN key_to_assets kta ON infos.asset = kta.asset + WHERE device_type != '"cbrs"' + "#; + const DEVICE_TYPES_WHERE_SNIPPET: &str = " AND device_type::text = any($1) "; + static DEVICE_TYPES_METADATA_SQL: LazyLock = + LazyLock::new(|| format!("{GET_MOBILE_HOTSPOT_INFO} {DEVICE_TYPES_WHERE_SNIPPET}")); + + pub async fn get_mobile_tracker_gateways_info( + db: impl PgExecutor<'_>, + min_updated_at: DateTime, + min_location_changed_at: Option>, + ) -> anyhow::Result { + let query = if let Some(min_loc) = min_location_changed_at { + sqlx::query(&GET_UPDATED_RADIOS_WITH_LOCATION) + .bind(min_updated_at) + .bind(min_loc) + } else { + sqlx::query(GET_UPDATED_RADIOS).bind(min_updated_at) + }; + + query + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + MobileTrackerInfoMap::new(), + |mut map: MobileTrackerInfoMap, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let last_changed_at = row.get::, &str>("last_changed_at"); + let asserted_location_changed_at = + row.get::>, &str>("asserted_location_changed_at"); + let asserted_location = row.get::, &str>("asserted_location"); + + map.insert( + PublicKeyBinary::from_str(&entity_key)?, + MobileTrackerInfo { + location: asserted_location.map(|v| v as u64), + last_changed_at, + asserted_location_changed_at, + }, + ); + Ok(map) + }, + ) + .await + } + + // Streams all gateway info records, optionally filtering by device types. + pub fn all_info_stream_v3<'a>( + db: impl PgExecutor<'a> + 'a, + device_types: &'a [DeviceTypeV2], + mtim: &'a MobileTrackerInfoMap, + ) -> impl Stream + 'a { + // Choose base query depending on whether filtering is needed. + let query = if device_types.is_empty() { + sqlx::query(GET_MOBILE_HOTSPOT_INFO) + } else { + sqlx::query(&DEVICE_TYPES_METADATA_SQL).bind( + device_types + .iter() + // The device_types field has a jsonb type but is being used as a string, + // which forces us to add quotes. + .map(|v| format!("\"{}\"", v.to_sql_param())) + .collect::>(), + ) + }; + + query + .fetch(db) + .filter_map(move |result| async move { + match result { + Ok(row) => process_row(row, mtim).await, + Err(e) => { + tracing::error!("SQLx fetch error: {e:?}"); + None + } + } + }) + .boxed() + } + + // Processes a single database row into a GatewayInfoV3, returning None if any step fails. + async fn process_row( + row: sqlx::postgres::PgRow, + mtim: &MobileTrackerInfoMap, + ) -> Option { + let device_type = DeviceTypeV2::from_str( + row.get::, &str>("device_type") + .to_string() + .as_ref(), + ) + .ok()?; + + let address = PublicKeyBinary::from_str( + &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), + ) + .ok()?; + + let mti = mtim.get(&address)?; + + let updated_at = mti.last_changed_at; + + let metadata = mti.location.and_then(|loc| { + let location_changed_at = mti.asserted_location_changed_at?; + let deployment_info = row + .try_get::>, _>("deployment_info") + .ok() + .flatten() + .and_then(|json| match json.0 { + DeploymentInfo::WifiDeploymentInfo(wdi) => Some(DeploymentInfoProto { + antenna: wdi.antenna, + elevation: wdi.elevation, + azimuth: wdi.azimuth, + }), + _ => None, + }); + + Some(GatewayMetadataV3 { + location_info: super::LocationInfo { + location: loc, + location_changed_at, + }, + deployment_info, + }) + }); + + let created_at: DateTime = row.get("created_at"); + let refreshed_at: DateTime = row.get("refreshed_at"); + + Some(GatewayInfoV3 { + address, + metadata, + device_type, + created_at, + refreshed_at, + updated_at, + }) + } +} diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 7dfd5fe9a..e285823eb 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -4,6 +4,7 @@ use crate::{ db::{get_batch_tracked_radios, get_updated_radios}, DeviceType, GatewayInfo, }, + gateway_info_v3::{self, db::get_mobile_tracker_gateways_info, DeviceTypeV2}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -18,8 +19,8 @@ use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::mobile_config::{ self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, - GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1, - GatewayInfoStreamResV2, GatewayInfoV2, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamReqV3, + GatewayInfoStreamResV1, GatewayInfoStreamResV2, GatewayInfoStreamResV3, GatewayInfoV2, }, Message, }; @@ -349,6 +350,64 @@ impl mobile_config::Gateway for GatewayService { Ok(Response::new(GrpcStreamResult::new(rx))) } + + type info_stream_v3Stream = GrpcStreamResult; + async fn info_stream_v3( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-stream-v3"); + custom_tracing::record_b58("signer", &request.signer); + + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, &request)?; + + let metadata_db_pool = self.metadata_pool.clone(); + let mobile_config_db_pool = self.mobile_config_db_pool.clone(); + let signing_key = self.signing_key.clone(); + let batch_size = request.batch_size; + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + + tokio::spawn(async move { + let min_updated_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; + + let min_location_changed_at = if request.min_location_changed_at == 0 { + None + } else { + Some( + Utc.timestamp_opt(request.min_location_changed_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_location_changed_at argument", + ))?, + ) + }; + + let mobile_tracker_gateways_info = get_mobile_tracker_gateways_info( + &mobile_config_db_pool, + min_updated_at, + min_location_changed_at, + ) + .await?; + let stream = gateway_info_v3::db::all_info_stream_v3( + &metadata_db_pool, + &device_types, + &mobile_tracker_gateways_info, + ); + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + }); + + Ok(Response::new(GrpcStreamResult::new(rx))) + } } fn handle_updated_at( @@ -426,20 +485,37 @@ impl GatewayInfoStreamRes for GatewayInfoStreamResV2 { } } -async fn stream_multi_gateways_info( - stream: impl Stream, +impl GatewayInfoStreamRes for GatewayInfoStreamResV3 { + type GatewayInfoType = mobile_config::GatewayInfoV3; + + fn new(gateways: Vec, timestamp: u64, signer: Vec) -> Self { + GatewayInfoStreamResV3 { + gateways, + timestamp, + signer, + signature: vec![], + } + } + + fn set_signature(&mut self, signature: Vec) { + self.signature = signature; + } +} + +async fn stream_multi_gateways_info( + stream: impl Stream, tx: tokio::sync::mpsc::Sender>, signing_key: Arc, batch_size: u32, ) -> anyhow::Result<()> where T: GatewayInfoStreamRes + Send + Sync + 'static + helium_proto::Message, - T::GatewayInfoType: TryFrom + Send + 'static, + T::GatewayInfoType: TryFrom + Send + 'static, { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); Ok(stream - .map(Ok::) + .map(Ok::) .try_filter_map(|info| async move { let result: Option = info.try_into().ok(); Ok(result) diff --git a/mobile_config/src/lib.rs b/mobile_config/src/lib.rs index 567c6d968..9495a07ac 100644 --- a/mobile_config/src/lib.rs +++ b/mobile_config/src/lib.rs @@ -13,6 +13,7 @@ pub mod carrier_service; pub mod client; pub mod entity_service; pub mod gateway_info; +pub mod gateway_info_v3; pub mod gateway_service; pub mod hex_boosting_service; pub mod sub_dao_epoch_reward_info; diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 042040f4c..9cd5c6021 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -227,7 +227,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream( r#" SELECT - DISTINCT ON (kta.entity_key, mhi.asset) + DISTINCT ON (kta.entity_key) kta.entity_key, mhi.asset, mhi.refreshed_at, @@ -244,7 +244,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream, + asserted_location: Option, + asserted_location_changed_at: Option>, ) { let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); sqlx::query( r#" INSERT INTO -"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") +"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at", "asserted_location", "asserted_location_changed_at") VALUES -($1, $2, $3, $4); +($1, $2, $3, $4, $5, $6); "#, ) .bind(b58) .bind("hash") .bind(last_changed_at) .bind(last_changed_at + Duration::hours(1)) + .bind(asserted_location) + .bind(asserted_location_changed_at) .execute(pool) .await .unwrap(); @@ -133,3 +137,37 @@ pub async fn create_db_tables(pool: &PgPool) { pub fn make_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) } + +use helium_crypto::PublicKey; +use helium_proto::services::mobile_config::{self as proto}; +use mobile_config::{ + gateway_service::GatewayService, + key_cache::{CacheKeys, KeyCache}, + KeyRole, +}; +use tokio::net::TcpListener; +use tonic::transport; + +pub async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle) +} diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 1db74af0c..d5cfcb4ae 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,7 +1,6 @@ -use std::vec; - use chrono::{Duration, Utc}; use futures::stream::StreamExt; +use std::vec; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ @@ -89,30 +88,6 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -async fn spawn_gateway_service( - pool: PgPool, - admin_pub_key: PublicKey, -) -> ( - String, - tokio::task::JoinHandle>, -) { - let server_key = make_keypair(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); - let handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - - (format!("http://{addr}"), handle) -} - #[sqlx::test] async fn gateway_stream_info_v1(pool: PgPool) { let admin_key = make_keypair(); @@ -243,7 +218,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at, None, None).await; // Shouldn't be returned add_db_record( @@ -257,7 +232,7 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -315,7 +290,7 @@ async fn gateway_info_batch_v2(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -408,7 +383,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at, None, None).await; // Must be ignored since not included in req add_db_record( @@ -546,7 +521,7 @@ async fn gateway_info_v2(pool: PgPool) { Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) ) .await; - add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -640,7 +615,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at, None, None).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); diff --git a/mobile_config/tests/gateway_service_v3.rs b/mobile_config/tests/gateway_service_v3.rs new file mode 100644 index 000000000..920d6b9b9 --- /dev/null +++ b/mobile_config/tests/gateway_service_v3.rs @@ -0,0 +1,460 @@ +use chrono::{Duration, Utc}; +use futures::stream::StreamExt; +use std::vec; + +use helium_crypto::{Keypair, PublicKey, Sign}; +use helium_proto::services::mobile_config::{ + self as proto, DeploymentInfo, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, + GatewayInfoV3, LocationInfo, +}; +use prost::Message; +use sqlx::PgPool; + +pub mod common; +use common::*; + +#[sqlx::test] +async fn gateway_stream_info_v3_basic(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + let now_plus_5 = now + chrono::Duration::seconds(5); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_plus_10, + Some(asset1_hex_idx), + Some(now_plus_5), + ) + .await; + + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_plus_10, + Some(now_plus_10), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), now_plus_10, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // Select all devices + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // Filter by device type + let req = make_gateway_stream_signed_req_v3(&admin_key, &[DeviceTypeV2::Indoor], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert_eq!( + gateway.metadata.clone().unwrap().location_info.unwrap(), + LocationInfo { + location: format!("{:x}", asset1_hex_idx), + location_changed_at: now_plus_5.timestamp() as u64 + } + ); + assert_eq!( + gateway.metadata.clone().unwrap().deployment_info.unwrap(), + DeploymentInfo { + antenna: 18, + elevation: 2, + azimuth: 161, + } + ); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_no_metadata(pool: PgPool) { + // There is location info but no deployment info + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + None, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), now_plus_10, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert!(gateway.metadata.is_none()); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) { + // There is location info but no deployment info + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + let now_plus_5 = now + chrono::Duration::seconds(5); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_plus_10, + Some(asset1_hex_idx), + Some(now_plus_5), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + let gateway: &GatewayInfoV3 = resp.gateways.first().unwrap(); + assert_eq!(gateway.device_type(), DeviceTypeV2::Indoor); + assert_eq!(gateway.address, asset1_pubkey.to_vec()); + assert_eq!(gateway.created_at, now.timestamp() as u64); + assert_eq!(gateway.updated_at, now_plus_10.timestamp() as u64); + assert_eq!( + gateway.metadata.clone().unwrap().location_info.unwrap(), + LocationInfo { + location: format!("{:x}", asset1_hex_idx), + location_changed_at: now_plus_5.timestamp() as u64 + } + ); + assert!(gateway.metadata.clone().unwrap().deployment_info.is_none(),); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_updated_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + updated_at, + Some(asset1_hex_idx), + Some(updated_at), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at, None, None).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], updated_at.timestamp() as u64, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::Indoor + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset1_hex_idx + ); + assert!(stream.next().await.is_none()); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) { + // asset_1 has no location + // asset_2 has location + // Make sure if min_location_changed_at == 0 then returned both radios + // if min_location_changed_at >= 1 then radios with null location filtered out + + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_minus_six = now - Duration::hours(6); + let now_minus_three = now - Duration::hours(3); + let now_minus_four = now - Duration::hours(4); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + None, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_minus_three, + None, + None, + ) + .await; + + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset2_pubkey.clone().into(), + now_minus_three, + Some(asset2_hex_idx), + Some(now_minus_four), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 0); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // min_location_changed_at = 1 + let req = make_gateway_stream_signed_req_v3(&admin_key, &[], 0, 1); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); +} + +#[sqlx::test] +async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_minus_six = now - Duration::hours(6); + let now_minus_three = now - Duration::hours(3); + let now_minus_four = now - Duration::hours(4); + let now_minus_five = now - Duration::hours(5); + + // Scenario: + // asset_1 location changed at now - 6 hours + // asset_2 location changed at now - 4 hours + // request min_location_changed_at location changed at now - 5 hours + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + Some(asset1_hex_idx), + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset1_pubkey.clone().into(), + now_minus_three, + Some(asset1_hex_idx), + Some(now_minus_six), + ) + .await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + Some(asset2_hex_idx), + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_minus_six, + Some(now), + None, + ) + .await; + add_mobile_tracker_record( + &pool, + asset2_pubkey.clone().into(), + now_minus_three, + Some(asset2_hex_idx), + Some(now_minus_four), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_five.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); + assert_eq!( + DeviceTypeV2::try_from(gw_info.device_type).unwrap(), + DeviceTypeV2::DataOnly + ); + assert_eq!( + i64::from_str_radix( + &gw_info + .metadata + .clone() + .unwrap() + .location_info + .unwrap() + .location, + 16 + ) + .unwrap(), + asset2_hex_idx + ); + assert!(stream.next().await.is_none()); + + // Change min_location_changed_at parameter, now two radios should be returned + let req = + make_gateway_stream_signed_req_v3(&admin_key, &[], 0, now_minus_six.timestamp() as u64); + let mut stream = client.info_stream_v3(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); +} + +fn make_gateway_stream_signed_req_v3( + signer: &Keypair, + device_types: &[DeviceTypeV2], + min_updated_at: u64, + min_location_changed_at: u64, +) -> proto::GatewayInfoStreamReqV3 { + let mut req = GatewayInfoStreamReqV3 { + batch_size: 10000, + signer: signer.public_key().to_vec(), + signature: vec![], + device_types: device_types + .iter() + .map(|v| DeviceTypeV2::into(*v)) + .collect(), + min_updated_at, + min_location_changed_at, + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +}