From 9f2382d2033724e7b6f3975c847382cc891cde33 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 9 Dec 2024 14:25:19 +0200 Subject: [PATCH 01/10] Implement min_updated_at --- Cargo.lock | 52 +++++++++++------ Cargo.toml | 4 +- mobile_config/src/gateway_info.rs | 29 ++++++---- mobile_config/src/gateway_service.rs | 46 ++++++++++----- mobile_config/src/main.rs | 1 + mobile_config/tests/gateway_service.rs | 80 +++++++++++++------------- 6 files changed, 130 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 596a95200..a71052324 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,11 +1615,11 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#8e3edc2053a16ec98421d83211399338836f91e4" +source = "git+https://github.com/helium/proto?branch=min_update_at#ffcd17dc3e2d3763234081c1e7be006f4fb906d7" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", @@ -1788,7 +1788,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2630,7 +2630,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "notify", "serde", @@ -3212,7 +3212,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3794,7 +3794,7 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", "hex-literal", "itertools", @@ -3834,6 +3834,22 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=min_update_at#ffcd17dc3e2d3763234081c1e7be006f4fb906d7" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum", + "strum_macros", + "tonic", + "tonic-build", +] + [[package]] name = "helium-sub-daos" version = "0.1.8" @@ -3875,7 +3891,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4291,7 +4307,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "humantime-serde", "metrics", @@ -4361,7 +4377,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "hextree", "http 0.2.11", "http-serde", @@ -4403,7 +4419,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4445,7 +4461,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http-serde", "humantime-serde", "iot-config", @@ -5034,7 +5050,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "hextree", "http 0.2.11", "http-serde", @@ -5075,7 +5091,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "mobile-config", "prost", "rand 0.8.5", @@ -5111,7 +5127,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5155,7 +5171,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "hex-assignments", "hextree", "http-serde", @@ -5839,7 +5855,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5922,7 +5938,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6561,7 +6577,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", "humantime-serde", "lazy_static", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 7388ff0ad..750807edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "min_update_at", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "min_update_at" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 5ff690564..a94f862dc 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -358,9 +358,10 @@ pub(crate) mod db { use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; use futures::stream::{Stream, StreamExt}; + use futures::TryStreamExt; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; - use std::str::FromStr; + use std::{collections::HashSet, str::FromStr}; const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, @@ -369,15 +370,26 @@ pub(crate) mod db { join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; + const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + + const GET_UPDATED_RADIOS: &str = + "SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1"; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL} - where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); - - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); + static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); + } + pub async fn get_updated_radios( + db: impl PgExecutor<'_>, + min_updated_at: DateTime, + ) -> anyhow::Result> { + sqlx::query_scalar(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .fetch(db) + .map_err(anyhow::Error::from) + .try_collect::>() // Collect results into a HashSet + .await } pub async fn get_info( @@ -413,16 +425,13 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], - min_refreshed_at: DateTime, ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) - .bind(min_refreshed_at) + true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) - .bind(min_refreshed_at) .bind( device_types .iter() diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 13398d287..46be3ab75 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -1,11 +1,12 @@ use crate::{ - gateway_info::{self, DeviceType, GatewayInfo}, + gateway_info::{self, db::get_updated_radios, DeviceType, GatewayInfo}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; use chrono::{DateTime, TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ + future, stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, }; @@ -17,22 +18,30 @@ use helium_proto::{ }, Message, }; +use solana_sdk::signer::Signer; use sqlx::{Pool, Postgres}; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use tonic::{Request, Response, Status}; pub struct GatewayService { key_cache: KeyCache, + mobile_config_db_pool: Pool, metadata_pool: Pool, signing_key: Arc, } impl GatewayService { - pub fn new(key_cache: KeyCache, metadata_pool: Pool, signing_key: Keypair) -> Self { + pub fn new( + key_cache: KeyCache, + metadata_pool: Pool, + signing_key: Keypair, + mobile_config_db_pool: Pool, + ) -> Self { Self { key_cache, metadata_pool, signing_key: Arc::new(signing_key), + mobile_config_db_pool, } } @@ -170,8 +179,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = - gateway_info::db::all_info_stream(&pool, &device_types, DateTime::UNIX_EPOCH); + let stream = gateway_info::db::all_info_stream(&pool, &device_types); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); @@ -191,18 +199,13 @@ impl mobile_config::Gateway for GatewayService { self.verify_request_signature(&signer, &request)?; let pool = self.metadata_pool.clone(); + let mc_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); - let min_refreshed_at = Utc - .timestamp_opt(request.min_refreshed_at as i64, 0) - .single() - .ok_or(Status::invalid_argument( - "Invalid min_refreshed_at argument", - ))?; tracing::debug!( "fetching all gateways' info (v2). Device types: {:?} ", @@ -210,8 +213,25 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); - stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + let stream = gateway_info::db::all_info_stream(&pool, &device_types); + if request.min_updated_at > 0 { + // It needs filtering only updated radios + let min_updated_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; + + let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; + let s = stream + .filter(|v| future::ready(updated_redios.contains(&v.address.to_string()))) + .boxed(); + stream_multi_gateways_info(s, tx.clone(), signing_key.clone(), batch_size).await + } else { + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) + .await + } }); Ok(Response::new(GrpcStreamResult::new(rx))) diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 2128851a3..b869bef2c 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -75,6 +75,7 @@ impl Daemon { key_cache.clone(), metadata_pool.clone(), settings.signing_keypair()?, + pool.clone(), ); let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?); let entity_svc = EntityService::new( diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 49101ca3b..06a622952 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -33,7 +33,8 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // Start the gateway server let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key); + // TODO + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let _handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -98,7 +99,8 @@ async fn spawn_gateway_service( // Start the gateway server let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool, server_key); + // TODO + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -216,41 +218,41 @@ async fn gateway_stream_info_v2(pool: PgPool) { ); } -#[sqlx::test] -async fn gateway_stream_info_v2_refreshed_at_is_null(pool: PgPool) { - let admin_key = make_keypair(); - let asset1_pubkey = make_keypair().public_key().clone(); - let asset1_hex_idx = 631711281837647359_i64; - let now = Utc::now(); - - create_db_tables(&pool).await; - add_db_record( - &pool, - "asset1", - asset1_hex_idx, - "\"wifiIndoor\"", - asset1_pubkey.clone().into(), - now, - None, - None, - ) - .await; - - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - let mut client = GatewayClient::connect(addr).await.unwrap(); - - let req = make_gateway_stream_signed_req_v2(&admin_key, &[], now.timestamp() as u64); - let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); - - // Make sure the gateway was returned - let resp = stream.next().await.unwrap().unwrap(); - assert_eq!(resp.gateways.len(), 1); - - let req = make_gateway_stream_signed_req_v2(&admin_key, &[], (now.timestamp() + 1) as u64); - let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); - // Response is empty - assert!(stream.next().await.is_none()); -} +// #[sqlx::test] +// async fn gateway_stream_info_v2_refreshed_at_is_null(pool: PgPool) { +// let admin_key = make_keypair(); +// let asset1_pubkey = make_keypair().public_key().clone(); +// let asset1_hex_idx = 631711281837647359_i64; +// let now = Utc::now(); +// +// create_db_tables(&pool).await; +// add_db_record( +// &pool, +// "asset1", +// asset1_hex_idx, +// "\"wifiIndoor\"", +// asset1_pubkey.clone().into(), +// now, +// None, +// None, +// ) +// .await; +// +// let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; +// let mut client = GatewayClient::connect(addr).await.unwrap(); +// +// let req = make_gateway_stream_signed_req_v2(&admin_key, &[], now.timestamp() as u64); +// let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); +// +// // Make sure the gateway was returned +// let resp = stream.next().await.unwrap().unwrap(); +// assert_eq!(resp.gateways.len(), 1); +// +// let req = make_gateway_stream_signed_req_v2(&admin_key, &[], (now.timestamp() + 1) as u64); +// let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); +// // Response is empty +// assert!(stream.next().await.is_none()); +// } #[sqlx::test] async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { @@ -477,14 +479,14 @@ fn make_keypair() -> Keypair { fn make_gateway_stream_signed_req_v2( signer: &Keypair, device_types: &[DeviceType], - min_refreshed_at: u64, + min_updated_at: u64, ) -> proto::GatewayInfoStreamReqV2 { let mut req = GatewayInfoStreamReqV2 { batch_size: 10000, signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), - min_refreshed_at, + min_updated_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); From 23f78577903dc6089e05e5a0e8a718f13b47895c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 9 Dec 2024 14:52:52 +0200 Subject: [PATCH 02/10] Fix clippy --- mobile_config/src/gateway_info.rs | 4 ++-- mobile_config/src/gateway_service.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index a94f862dc..1d423f9f1 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -388,7 +388,7 @@ pub(crate) mod db { .bind(min_updated_at) .fetch(db) .map_err(anyhow::Error::from) - .try_collect::>() // Collect results into a HashSet + .try_collect::>() .await } @@ -427,7 +427,7 @@ pub(crate) mod db { device_types: &'a [DeviceType], ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL) + true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 46be3ab75..a5329265a 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ future, @@ -18,9 +18,8 @@ use helium_proto::{ }, Message, }; -use solana_sdk::signer::Signer; use sqlx::{Pool, Postgres}; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use tonic::{Request, Response, Status}; pub struct GatewayService { @@ -224,10 +223,11 @@ impl mobile_config::Gateway for GatewayService { ))?; let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; - let s = stream + let stream = stream .filter(|v| future::ready(updated_redios.contains(&v.address.to_string()))) .boxed(); - stream_multi_gateways_info(s, tx.clone(), signing_key.clone(), batch_size).await + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) + .await } else { stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) .await From 78c519f7aba331caaf8d4714728c07cb858d3687 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 9 Dec 2024 17:13:07 +0200 Subject: [PATCH 03/10] Fix updated_at filtering. Add gateway_stream_info_v2_updated_at test --- mobile_config/src/gateway_info.rs | 18 ++-- mobile_config/tests/gateway_service.rs | 120 +++++++++++++++++-------- 2 files changed, 96 insertions(+), 42 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 1d423f9f1..430ea49b8 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -358,7 +358,6 @@ pub(crate) mod db { use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; use futures::stream::{Stream, StreamExt}; - use futures::TryStreamExt; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; use std::{collections::HashSet, str::FromStr}; @@ -384,12 +383,19 @@ pub(crate) mod db { db: impl PgExecutor<'_>, min_updated_at: DateTime, ) -> anyhow::Result> { - sqlx::query_scalar(GET_UPDATED_RADIOS) + let rows: Vec> = sqlx::query_scalar(GET_UPDATED_RADIOS) .bind(min_updated_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_collect::>() - .await + .fetch_all(db) + .await?; + let mut radios = HashSet::new(); + + for row in rows { + let entity_key_b: &[u8] = &row; + let entity_key = bs58::encode(entity_key_b).into_string(); + radios.insert(entity_key); + } + + Ok(radios) } pub async fn get_info( diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 06a622952..8b34fc55a 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use futures::stream::StreamExt; use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -218,41 +218,65 @@ async fn gateway_stream_info_v2(pool: PgPool) { ); } -// #[sqlx::test] -// async fn gateway_stream_info_v2_refreshed_at_is_null(pool: PgPool) { -// let admin_key = make_keypair(); -// let asset1_pubkey = make_keypair().public_key().clone(); -// let asset1_hex_idx = 631711281837647359_i64; -// let now = Utc::now(); -// -// create_db_tables(&pool).await; -// add_db_record( -// &pool, -// "asset1", -// asset1_hex_idx, -// "\"wifiIndoor\"", -// asset1_pubkey.clone().into(), -// now, -// None, -// None, -// ) -// .await; -// -// let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; -// let mut client = GatewayClient::connect(addr).await.unwrap(); -// -// let req = make_gateway_stream_signed_req_v2(&admin_key, &[], now.timestamp() as u64); -// let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); -// -// // Make sure the gateway was returned -// let resp = stream.next().await.unwrap().unwrap(); -// assert_eq!(resp.gateways.len(), 1); -// -// let req = make_gateway_stream_signed_req_v2(&admin_key, &[], (now.timestamp() + 1) as u64); -// let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); -// // Response is empty -// assert!(stream.next().await.is_none()); -// } +#[sqlx::test] +async fn gateway_stream_info_v2_updated_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v2(&admin_key, &[], updated_at.timestamp() as u64); + let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceType::try_from(gw_info.device_type).unwrap(), + DeviceType::WifiIndoor + ); + assert_eq!( + i64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + asset1_hex_idx + ); + assert!(stream.next().await.is_none()); +} #[sqlx::test] async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { @@ -376,6 +400,30 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { } } +async fn add_mobile_tracker_record( + pool: &PgPool, + key: PublicKeyBinary, + last_changed_at: DateTime, +) { + let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); + + sqlx::query( + r#" + INSERT INTO +"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") + VALUES +($1, $2, $3, $4); + "#, + ) + .bind(b58) + .bind("hash") + .bind(last_changed_at) + .bind(last_changed_at + Duration::hours(1)) + .execute(pool) + .await + .unwrap(); +} + #[allow(clippy::too_many_arguments)] async fn add_db_record( pool: &PgPool, From 4f4cc9bb8c7d64116602c7fe50518221c0f45b8d Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 9 Dec 2024 17:20:10 +0200 Subject: [PATCH 04/10] Optimization --- mobile_config/src/gateway_info.rs | 5 +++-- mobile_config/src/gateway_service.rs | 3 +-- mobile_config/tests/gateway_service.rs | 2 -- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 430ea49b8..0a758fae7 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -382,7 +382,7 @@ pub(crate) mod db { pub async fn get_updated_radios( db: impl PgExecutor<'_>, min_updated_at: DateTime, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let rows: Vec> = sqlx::query_scalar(GET_UPDATED_RADIOS) .bind(min_updated_at) .fetch_all(db) @@ -392,7 +392,8 @@ pub(crate) mod db { for row in rows { let entity_key_b: &[u8] = &row; let entity_key = bs58::encode(entity_key_b).into_string(); - radios.insert(entity_key); + let pk = PublicKeyBinary::from_str(&entity_key)?; + radios.insert(pk); } Ok(radios) diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index a5329265a..b269c2c08 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -214,7 +214,6 @@ impl mobile_config::Gateway for GatewayService { tokio::spawn(async move { let stream = gateway_info::db::all_info_stream(&pool, &device_types); if request.min_updated_at > 0 { - // It needs filtering only updated radios let min_updated_at = Utc .timestamp_opt(request.min_updated_at as i64, 0) .single() @@ -224,7 +223,7 @@ impl mobile_config::Gateway for GatewayService { let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; let stream = stream - .filter(|v| future::ready(updated_redios.contains(&v.address.to_string()))) + .filter(|v| future::ready(updated_redios.contains(&v.address))) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) .await diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 8b34fc55a..cc8d2ce4a 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -33,7 +33,6 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // Start the gateway server let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - // TODO let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let _handle = tokio::spawn( transport::Server::builder() @@ -99,7 +98,6 @@ async fn spawn_gateway_service( // Start the gateway server let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - // TODO let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let handle = tokio::spawn( transport::Server::builder() From c2343a821e5eb8710233c3c281398405f4527360 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 10 Dec 2024 09:03:09 +0200 Subject: [PATCH 05/10] Remove refreshed_at --- Cargo.lock | 4 ++-- mobile_config/src/gateway_info.rs | 18 ------------------ .../tests/integrations/speedtests.rs | 1 - 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a71052324..068732d09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#ffcd17dc3e2d3763234081c1e7be006f4fb906d7" +source = "git+https://github.com/helium/proto?branch=min_update_at#ae8d826060acf308cb4d040d6aae92635eca1387" dependencies = [ "base64 0.21.7", "byteorder", @@ -3837,7 +3837,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#ffcd17dc3e2d3763234081c1e7be006f4fb906d7" +source = "git+https://github.com/helium/proto?branch=min_update_at#ae8d826060acf308cb4d040d6aae92635eca1387" dependencies = [ "bytes", "prost", diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 0a758fae7..5501880fb 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -104,7 +104,6 @@ pub struct GatewayInfo { pub metadata: Option, pub device_type: DeviceType, // None for V1 - pub refreshed_at: Option>, pub created_at: Option>, } @@ -135,7 +134,6 @@ impl TryFrom for GatewayInfo { metadata, device_type: _, created_at, - refreshed_at, } = info; let metadata = if let Some(metadata) = metadata { @@ -154,16 +152,11 @@ impl TryFrom for GatewayInfo { .single() .ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?; - let refreshed_at = Utc.timestamp_opt(refreshed_at as i64, 0).single().ok_or( - GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at), - )?; - Ok(Self { address: address.into(), metadata, device_type: device_type_, created_at: Some(created_at), - refreshed_at: Some(refreshed_at), }) } } @@ -196,7 +189,6 @@ impl TryFrom for GatewayInfo { metadata, device_type: device_type_, created_at: None, - refreshed_at: None, }) } } @@ -297,10 +289,6 @@ impl TryFrom for GatewayInfoProtoV2 { .created_at .ok_or(GatewayInfoToProtoError::CreatedAtIsNone)? .timestamp() as u64, - refreshed_at: info - .refreshed_at - .ok_or(GatewayInfoToProtoError::RefreshedAtIsNone)? - .timestamp() as u64, }) } } @@ -480,11 +468,6 @@ pub(crate) mod db { ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; let created_at = row.get::, &str>("created_at"); - // `refreshed_at` can be NULL in the database schema. - // If so, fallback to using `created_at` as the default value of `refreshed_at`. - let refreshed_at = row - .get::>, &str>("refreshed_at") - .unwrap_or(created_at); Ok(Self { address: PublicKeyBinary::from_str( @@ -493,7 +476,6 @@ pub(crate) mod db { .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, metadata, device_type, - refreshed_at: Some(refreshed_at), created_at: Some(created_at), }) } diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index d63fd0a54..791480b45 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -35,7 +35,6 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { metadata: None, device_type: DeviceType::Cbrs, created_at: None, - refreshed_at: None, })) } From c3f30d55d5bd742a7f806ee76171a29c1c515692 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 10 Dec 2024 09:53:22 +0200 Subject: [PATCH 06/10] Fix PR comments --- mobile_config/src/gateway_info.rs | 31 +++++++++++++++------------- mobile_config/src/gateway_service.rs | 9 ++++---- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 5501880fb..044cfd6c9 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -345,7 +345,10 @@ pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; - use futures::stream::{Stream, StreamExt}; + use futures::{ + stream::{Stream, StreamExt}, + TryStreamExt, + }; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; use std::{collections::HashSet, str::FromStr}; @@ -371,20 +374,20 @@ pub(crate) mod db { db: impl PgExecutor<'_>, min_updated_at: DateTime, ) -> anyhow::Result> { - let rows: Vec> = sqlx::query_scalar(GET_UPDATED_RADIOS) + sqlx::query(GET_UPDATED_RADIOS) .bind(min_updated_at) - .fetch_all(db) - .await?; - let mut radios = HashSet::new(); - - for row in rows { - let entity_key_b: &[u8] = &row; - let entity_key = bs58::encode(entity_key_b).into_string(); - let pk = PublicKeyBinary::from_str(&entity_key)?; - radios.insert(pk); - } - - Ok(radios) + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + HashSet::new(), + |mut set: HashSet, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + set.insert(PublicKeyBinary::from_str(&entity_key)?); + Ok(set) + }, + ) + .await } pub async fn get_info( diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index b269c2c08..e23abb94e 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -197,8 +197,8 @@ impl mobile_config::Gateway for GatewayService { let signer = verify_public_key(&request.signer)?; self.verify_request_signature(&signer, &request)?; - let pool = self.metadata_pool.clone(); - let mc_pool = self.mobile_config_db_pool.clone(); + let metadata_db_pool = self.metadata_pool.clone(); + let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; @@ -212,7 +212,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types); + let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); if request.min_updated_at > 0 { let min_updated_at = Utc .timestamp_opt(request.min_updated_at as i64, 0) @@ -221,7 +221,8 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; - let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; + let updated_redios = + get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; let stream = stream .filter(|v| future::ready(updated_redios.contains(&v.address))) .boxed(); From 119ad63e3d55967922f7f6f9a1b281f1e9ecb299 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 11 Dec 2024 13:04:56 +0200 Subject: [PATCH 07/10] Add info_v2 and info_batch_v2 --- Cargo.lock | 10 ++-- mobile_config/src/gateway_service.rs | 83 +++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 068732d09..fb4473158 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#ae8d826060acf308cb4d040d6aae92635eca1387" +source = "git+https://github.com/helium/proto?branch=min_update_at#fd89db36403e74ffbdb325924fbbb8cfb8757084" dependencies = [ "base64 0.21.7", "byteorder", @@ -1625,7 +1625,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -3837,7 +3837,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#ae8d826060acf308cb4d040d6aae92635eca1387" +source = "git+https://github.com/helium/proto?branch=min_update_at#fd89db36403e74ffbdb325924fbbb8cfb8757084" dependencies = [ "bytes", "prost", @@ -6079,7 +6079,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -10004,7 +10004,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf", diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index e23abb94e..9a3395a29 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -13,8 +13,9 @@ use futures::{ use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::mobile_config::{ - self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, - GatewayInfoStreamReqV2, GatewayInfoStreamResV1, GatewayInfoStreamResV2, + self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1, + GatewayInfoStreamResV2, }, Message, }; @@ -76,6 +77,7 @@ impl GatewayService { #[tonic::async_trait] impl mobile_config::Gateway for GatewayService { + // Deprecated async fn info(&self, request: Request) -> GrpcResult { let request = request.into_inner(); telemetry::count_request("gateway", "info"); @@ -116,6 +118,47 @@ impl mobile_config::Gateway for GatewayService { ) } + async fn info_v2(&self, request: Request) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-v2"); + custom_tracing::record_b58("pub_key", &request.address); + custom_tracing::record_b58("signer", &request.signer); + + self.verify_request_signature_for_info(&request)?; + + let pubkey: PublicKeyBinary = request.address.into(); + tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); + + gateway_info::db::get_info(&self.metadata_pool, &pubkey) + .await + .map_err(|_| Status::internal("error fetching gateway info (v2)"))? + .map_or_else( + || { + telemetry::count_gateway_chain_lookup("not-found"); + Err(Status::not_found(pubkey.to_string())) + }, + |info| { + if info.metadata.is_some() { + telemetry::count_gateway_chain_lookup("asserted"); + } else { + telemetry::count_gateway_chain_lookup("not-asserted"); + }; + let info = info + .try_into() + .map_err(|_| Status::internal("error serializing gateway info (v2)"))?; + let mut res = GatewayInfoResV2 { + info: Some(info), + timestamp: Utc::now().encode_timestamp(), + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + res.signature = self.sign_response(&res.encode_to_vec())?; + Ok(Response::new(res)) + }, + ) + } + + // Deprecated type info_batchStream = GrpcStreamResult; async fn info_batch( &self, @@ -152,6 +195,42 @@ impl mobile_config::Gateway for GatewayService { Ok(Response::new(GrpcStreamResult::new(rx))) } + type info_batch_v2Stream = GrpcStreamResult; + async fn info_batch_v2( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-batch-v2"); + custom_tracing::record_b58("signer", &request.signer); + + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, &request)?; + + tracing::debug!( + batch = request.addresses.len(), + "fetching gateways' info batch" + ); + + let pool = self.metadata_pool.clone(); + let signing_key = self.signing_key.clone(); + let batch_size = request.batch_size; + let addresses = request + .addresses + .into_iter() + .map(|key| key.into()) + .collect::>(); + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + tokio::spawn(async move { + let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?; + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + }); + + Ok(Response::new(GrpcStreamResult::new(rx))) + } + // Deprecated type info_streamStream = GrpcStreamResult; async fn info_stream( From df16f904bc76305d828891c7352a21706102ce9e Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 11 Dec 2024 14:23:56 +0200 Subject: [PATCH 08/10] Add tests --- mobile_config/tests/gateway_service.rs | 154 +++++++++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index cc8d2ce4a..609cb43cb 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,3 +1,5 @@ +use std::vec; + use chrono::{DateTime, Duration, Utc}; use futures::stream::StreamExt; @@ -276,6 +278,144 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { assert!(stream.next().await.is_none()); } +#[sqlx::test] +async fn gateway_info_batch_v2(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_signed_info_batch_request( + &vec![asset1_pubkey.clone(), asset2_pubkey.clone()], + &admin_key, + ); + let stream = client.info_batch_v2(req).await.unwrap().into_inner(); + let resp = stream + .filter_map(|result| async { result.ok() }) + .collect::>() + .await; + + let gateways = resp.first().unwrap().gateways.clone(); + let gw1 = gateways + .iter() + .find(|v| v.address == asset1_pubkey.to_vec()) + .unwrap(); + + let deployment_info = gw1.metadata.clone().unwrap().deployment_info.unwrap(); + + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 18); + assert_eq!(v.azimuth, 161); + assert_eq!(v.elevation, 2); + assert_eq!(v.electrical_down_tilt, 3); + assert_eq!(v.mechanical_down_tilt, 4); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + + let gw2 = gateways + .iter() + .find(|v| v.address == asset2_pubkey.to_vec()) + .unwrap(); + assert!(gw2.metadata.clone().unwrap().deployment_info.is_none()); +} + +#[sqlx::test] +async fn gateway_info_v2(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + + let gw_info = resp.info.unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceType::try_from(gw_info.device_type).unwrap(), + DeviceType::WifiIndoor + ); + assert_eq!( + i64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + asset1_hex_idx + ); + + let deployment_info = gw_info.metadata.clone().unwrap().deployment_info.unwrap(); + + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 18); + assert_eq!(v.azimuth, 161); + assert_eq!(v.elevation, 2); + assert_eq!(v.electrical_down_tilt, 3); + assert_eq!(v.mechanical_down_tilt, 4); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + + // Non-existent + let req = make_signed_info_request(&asset2_pubkey, &admin_key); + let resp_err = client + .info_v2(req) + .await + .expect_err("testing expects error"); + + assert_eq!(resp_err.code(), Code::NotFound); +} + #[sqlx::test] async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { let admin_key = make_keypair(); @@ -563,3 +703,17 @@ fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::Gat req.signature = signer.sign(&req.encode_to_vec()).unwrap(); req } + +fn make_signed_info_batch_request( + addresses: &[PublicKey], + signer: &Keypair, +) -> proto::GatewayInfoBatchReqV1 { + let mut req = proto::GatewayInfoBatchReqV1 { + addresses: addresses.iter().map(|v| v.to_vec()).collect(), + batch_size: 42, + signer: signer.public_key().to_vec(), + signature: vec![], + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} From 4c9688304502145a9870e5df0c4528330903c6a2 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 12 Dec 2024 14:03:00 +0200 Subject: [PATCH 09/10] Fix typo --- mobile_config/src/gateway_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 9a3395a29..7b66405c8 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -300,10 +300,10 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; - let updated_redios = + let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; let stream = stream - .filter(|v| future::ready(updated_redios.contains(&v.address))) + .filter(|v| future::ready(updated_radios.contains(&v.address))) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) .await From 36205bf5e6b2e7e90eb3870cb7b4092b987ce14a Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 17 Dec 2024 18:34:18 +0200 Subject: [PATCH 10/10] Use helium-proto master branch --- Cargo.lock | 54 +++++++++++++++++++----------------------------------- Cargo.toml | 4 ++-- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dc251894..fc459c706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,11 +1615,11 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#60f88b67faef0a6e4e0209ca0a2f9b1051b9aad3" +source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "prost", "rand 0.8.5", "rand_chacha 0.3.0", @@ -1788,7 +1788,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -2630,7 +2630,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "notify", "serde", @@ -3212,7 +3212,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "hex-literal", "http 0.2.11", "lazy_static", @@ -3794,7 +3794,7 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", + "helium-proto", "hex", "hex-literal", "itertools", @@ -3821,23 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#16d6838b88a35ac88797d0c7eb14a932b214a856" -dependencies = [ - "bytes", - "prost", - "prost-build", - "serde", - "serde_json", - "strum", - "strum_macros", - "tonic", - "tonic-build", -] - -[[package]] -name = "helium-proto" -version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=min_update_at#60f88b67faef0a6e4e0209ca0a2f9b1051b9aad3" +source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" dependencies = [ "bytes", "prost", @@ -3891,7 +3875,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4307,7 +4291,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "humantime-serde", "metrics", @@ -4377,7 +4361,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "hextree", "http 0.2.11", "http-serde", @@ -4419,7 +4403,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -4461,7 +4445,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http-serde", "humantime-serde", "iot-config", @@ -5050,7 +5034,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "hextree", "http 0.2.11", "http-serde", @@ -5091,7 +5075,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "mobile-config", "prost", "rand 0.8.5", @@ -5127,7 +5111,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -5171,7 +5155,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "hex-assignments", "hextree", "http-serde", @@ -5855,7 +5839,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5938,7 +5922,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6577,7 +6561,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=min_update_at)", + "helium-proto", "humantime-serde", "lazy_static", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 750807edb..7388ff0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "min_update_at", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "min_update_at" } +beacon = { git = "https://github.com/helium/proto", branch = "master" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18"