diff --git a/Cargo.lock b/Cargo.lock index 9feaa530c..db789bda0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1323,15 +1323,15 @@ name = "beacon" version = "0.1.0" source = "git+https://github.com/helium/proto?branch=master#60e0047b6e0e38b8119a055b37a7043cb02dbc82" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "byteorder", "helium-proto", "prost", - "rand 0.8.5", - "rand_chacha 0.3.1", + "rand 0.7.3", + "rand_chacha 0.2.2", "rust_decimal", "serde", - "sha2 0.10.9", + "sha2 0.9.9", "thiserror 1.0.69", ] @@ -1362,7 +1362,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "log", "prettyplease", "proc-macro2", @@ -2349,7 +2349,7 @@ dependencies = [ name = "denylist" version = "0.1.0" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "bincode", "bytes", "chrono", @@ -2722,7 +2722,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -2823,7 +2823,7 @@ dependencies = [ "aws-config", "aws-sdk-s3", "aws-smithy-types-convert", - "base64 0.22.1", + "base64 0.21.7", "bs58", "bytes", "chrono", @@ -2863,7 +2863,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "beacon", "blake3", "bs58", @@ -3451,7 +3451,7 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3ae6081be123db88474f4c5f12f9b0d3d90b9f5ce15171dee64fe92e3cae2dd" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "bs58", "byteorder", "ed25519-compact", @@ -3478,7 +3478,7 @@ dependencies = [ "anchor-spl", "angry-purple-tiger", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "bincode", "bytemuck", "chrono", @@ -3856,7 +3856,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.0", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -4072,7 +4072,7 @@ version = "0.1.0" dependencies = [ "anyhow", "backon 1.5.2", - "base64 0.22.1", + "base64 0.21.7", "bs58", "chrono", "clap", @@ -4154,7 +4154,7 @@ dependencies = [ "anyhow", "async-trait", "backon 0.5.0", - "base64 0.22.1", + "base64 0.21.7", "bs58", "chrono", "clap", @@ -4239,7 +4239,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "beacon", "blake3", "chrono", @@ -4593,7 +4593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.5", + "windows-targets 0.48.5", ] [[package]] @@ -4910,7 +4910,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "blake3", "bs58", "chrono", @@ -4966,7 +4966,7 @@ version = "0.1.0" dependencies = [ "angry-purple-tiger", "anyhow", - "base64 0.22.1", + "base64 0.21.7", "clap", "custom-tracing", "dialoguer", @@ -5034,7 +5034,7 @@ dependencies = [ "async-compression", "async-trait", "aws-local", - "base64 0.22.1", + "base64 0.21.7", "chrono", "clap", "config", @@ -5667,7 +5667,7 @@ name = "poc-entropy" version = "0.1.0" dependencies = [ "anyhow", - "base64 0.22.1", + "base64 0.21.7", "blake3", "bs58", "chrono", @@ -5923,7 +5923,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -5945,7 +5945,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.106", @@ -6043,7 +6043,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls 0.23.32", - "socket2 0.6.0", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -6082,7 +6082,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.0", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -6435,7 +6435,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "base64 0.22.1", + "base64 0.21.7", "bs58", "chrono", "clap", @@ -6705,7 +6705,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -10476,7 +10476,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -11108,7 +11108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] @@ -11962,7 +11962,7 @@ version = "0.7.0" source = "git+https://github.com/helium/xorf-generator?branch=main#0d23ba2dcac2e2823d842566436df406f5ce923c" dependencies = [ "anyhow", - "base64 0.22.1", + "base64 0.21.7", "bincode", "bytes", "clap", @@ -11975,7 +11975,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.9", + "sha2 0.9.9", "thiserror 1.0.69", "twox-hash", "xorf", diff --git a/mobile_config/src/gateway/client.rs b/mobile_config/src/gateway/client.rs index f1ef0181c..8d177a8a6 100644 --- a/mobile_config/src/gateway/client.rs +++ b/mobile_config/src/gateway/client.rs @@ -1,5 +1,6 @@ use crate::client::{call_with_retry, ClientError, Settings}; use crate::gateway::service::info::{GatewayInfo, GatewayInfoStream}; +use chrono::{DateTime, Utc}; use file_store_oracles::traits::MsgVerify; use futures::stream::{self, StreamExt}; use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -49,6 +50,7 @@ pub trait GatewayInfoResolver: Clone + Send + Sync + 'static { async fn resolve_gateway_info( &self, address: &PublicKeyBinary, + gateway_query_timestamp: &DateTime, ) -> Result, ClientError>; async fn stream_gateways_info( @@ -62,27 +64,30 @@ impl GatewayInfoResolver for GatewayClient { async fn resolve_gateway_info( &self, address: &PublicKeyBinary, + gateway_query_timestamp: &DateTime, ) -> Result, ClientError> { if let Some(cached_response) = self.cache.get(address).await { return Ok(cached_response.value().clone()); } - let mut request = mobile_config::GatewayInfoReqV1 { + let mut request = mobile_config::GatewayInfoAtTimestampReqV1 { address: address.clone().into(), signer: self.signing_key.public_key().into(), signature: vec![], + query_time: gateway_query_timestamp.clone().timestamp() as u64, }; request.signature = self.signing_key.sign(&request.encode_to_vec())?; tracing::debug!(pubkey = address.to_string(), "fetching gateway info"); - let response = match call_with_retry!(self.client.clone().info_v2(request.clone())) { - Ok(info_res) => { - let response = info_res.into_inner(); - response.verify(&self.config_pubkey)?; - response.info.map(GatewayInfo::try_from).transpose()? - } - Err(status) if status.code() == tonic::Code::NotFound => None, - Err(status) => Err(status)?, - }; + let response = + match call_with_retry!(self.client.clone().info_at_timestamp(request.clone())) { + Ok(info_res) => { + let response = info_res.into_inner(); + response.verify(&self.config_pubkey)?; + response.info.map(GatewayInfo::try_from).transpose()? + } + Err(status) if status.code() == tonic::Code::NotFound => None, + Err(status) => Err(status)?, + }; self.cache .insert(address.clone(), response.clone(), self.cache_ttl) diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 82adffeb1..12b66d844 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -79,7 +79,10 @@ async fn verify_report( return Ok(ReportStatus::Banned); } - if !mobile_config.is_gateway_known(gw_pub_key).await { + if !mobile_config + .is_gateway_known(gw_pub_key, &report.received_timestamp) + .await + { return Ok(ReportStatus::InvalidGatewayKey); } diff --git a/mobile_packet_verifier/src/banning/mod.rs b/mobile_packet_verifier/src/banning/mod.rs index b97aebe03..cae17fb19 100644 --- a/mobile_packet_verifier/src/banning/mod.rs +++ b/mobile_packet_verifier/src/banning/mod.rs @@ -21,7 +21,7 @@ pub const BAN_CLEANUP_DAYS: i64 = 7; #[derive(Debug, Deserialize, Serialize)] pub struct BanSettings { /// Where do we look in s3 for ban files - pub input_bucket: String, + pub input_bucket: file_store::BucketSettings, /// How often to purge expired bans #[serde(with = "humantime_serde", default = "default_purge_interval")] pub purge_interval: Duration, @@ -40,12 +40,11 @@ fn default_ingest_start_after() -> DateTime { pub async fn create_managed_task( pool: PgPool, - client: file_store::Client, settings: &BanSettings, ) -> anyhow::Result { let (ban_report_rx, ban_report_server) = file_source::continuous_source() .state(pool.clone()) - .file_store(client, settings.input_bucket.clone()) + .bucket_client(settings.input_bucket.connect().await) .lookback_start_after(settings.start_after) .prefix(FileType::VerifiedMobileBanReport.to_string()) .create() diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 515a78c95..47e05cb96 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -156,9 +156,8 @@ impl Cmd { None }; - let file_store_client = settings.file_store.connect().await; let (file_upload, file_upload_server) = - file_upload::FileUpload::new(file_store_client.clone(), settings.output_bucket.clone()) + file_upload::FileUpload::from_bucket_client(settings.output_bucket.connect().await) .await; let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink( @@ -189,7 +188,7 @@ impl Cmd { let (reports, reports_server) = file_source::continuous_source() .state(pool.clone()) - .file_store(file_store_client.clone(), settings.ingest_bucket.clone()) + .bucket_client(settings.ingest_bucket.connect().await) .prefix(FileType::DataTransferSessionIngestReport.to_string()) .lookback_start_after(settings.start_after) .create() @@ -207,8 +206,7 @@ impl Cmd { ); let event_id_purger = EventIdPurger::from_settings(pool.clone(), settings); - let banning = - banning::create_managed_task(pool, file_store_client, &settings.banning).await?; + let banning = banning::create_managed_task(pool, &settings.banning).await?; TaskManager::builder() .add_task(file_upload_server) diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 0a02cd8b9..6c0bddb14 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -1,5 +1,6 @@ extern crate tls_init; +use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::NetworkKeyRole; use mobile_config::{ @@ -43,14 +44,26 @@ impl MobileConfigClients { #[async_trait::async_trait] pub trait MobileConfigResolverExt { - async fn is_gateway_known(&self, public_key: &PublicKeyBinary) -> bool; + async fn is_gateway_known( + &self, + public_key: &PublicKeyBinary, + gateway_query_time: &DateTime, + ) -> bool; async fn is_routing_key_known(&self, public_key: &PublicKeyBinary) -> bool; } #[async_trait::async_trait] impl MobileConfigResolverExt for MobileConfigClients { - async fn is_gateway_known(&self, public_key: &PublicKeyBinary) -> bool { - match self.gateway_client.resolve_gateway_info(public_key).await { + async fn is_gateway_known( + &self, + public_key: &PublicKeyBinary, + gateway_query_time: &DateTime, + ) -> bool { + match self + .gateway_client + .resolve_gateway_info(public_key, gateway_query_time) + .await + { Ok(res) => res.is_some(), Err(_err) => false, } diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 3d4336707..72b54f42c 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -11,7 +11,7 @@ use crate::banning; #[derive(Debug, Deserialize, Serialize)] pub struct Settings { - /// RUST_LOG compatible settings string. Defsault to + /// RUST_LOG compatible settings string. Default to /// "mobile_verifier=debug,poc_store=info" #[serde(default = "default_log")] pub log: String, @@ -28,11 +28,9 @@ pub struct Settings { pub min_burn_period: Duration, pub database: db_store::Settings, #[serde(default)] - pub file_store: file_store::Settings, - pub ingest_bucket: String, - pub output_bucket: String, - #[serde(default)] pub metrics: poc_metrics::Settings, + pub ingest_bucket: file_store::BucketSettings, + pub output_bucket: file_store::BucketSettings, #[serde(default)] pub enable_solana_integration: bool, pub solana: Option, diff --git a/mobile_packet_verifier/tests/integrations/common/mod.rs b/mobile_packet_verifier/tests/integrations/common/mod.rs index 0f6a48946..6bf7d2288 100644 --- a/mobile_packet_verifier/tests/integrations/common/mod.rs +++ b/mobile_packet_verifier/tests/integrations/common/mod.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use mobile_packet_verifier::MobileConfigResolverExt; @@ -22,7 +23,11 @@ pub struct TestMobileConfig { #[async_trait::async_trait] impl MobileConfigResolverExt for TestMobileConfig { - async fn is_gateway_known(&self, public_key: &PublicKeyBinary) -> bool { + async fn is_gateway_known( + &self, + public_key: &PublicKeyBinary, + _gateway_query_time: &DateTime, + ) -> bool { self.valid_gateways.is_valid(public_key) } diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index d0b09f223..4e0eccda7 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -378,7 +378,7 @@ impl ValidatedHeartbeat { } match gateway_info_resolver - .resolve_gateway(&heartbeat.hotspot_key) + .resolve_gateway(&heartbeat.hotspot_key, &heartbeat.timestamp) .await? { GatewayResolution::DataOnly => Ok(Self::new( diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 4a04e2942..b62f5296a 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -21,6 +21,7 @@ pub mod unique_connections; pub use settings::Settings; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use mobile_config::client::ClientError; use rust_decimal::Decimal; use solana::SolPubkey; @@ -43,6 +44,7 @@ pub trait GatewayResolver: Clone + Send + Sync + 'static { async fn resolve_gateway( &self, address: &helium_crypto::PublicKeyBinary, + gateway_query_timestamp: &DateTime, ) -> Result; } @@ -51,11 +53,15 @@ impl GatewayResolver for mobile_config::GatewayClient { async fn resolve_gateway( &self, address: &helium_crypto::PublicKeyBinary, + gateway_query_timestamp: &DateTime, ) -> Result { use mobile_config::gateway::client::GatewayInfoResolver; use mobile_config::gateway::service::info::{DeviceType, GatewayInfo}; - match self.resolve_gateway_info(address).await? { + match self + .resolve_gateway_info(address, gateway_query_timestamp) + .await? + { None => Ok(GatewayResolution::GatewayNotFound), Some(GatewayInfo { device_type: DeviceType::WifiDataOnly, diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index dbd47c926..44b702237 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -151,6 +151,7 @@ where tracing::info!("Processing speedtest file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; let mut speedtests = file.into_stream(&mut transaction).await?; + while let Some(speedtest_report) = speedtests.next().await { let result = self.validate_speedtest(&speedtest_report).await?; if result == SpeedtestResult::SpeedtestValid { @@ -186,7 +187,7 @@ where match self .gateway_info_resolver - .resolve_gateway_info(&speedtest.report.pubkey) + .resolve_gateway_info(&speedtest.report.pubkey, &speedtest.received_timestamp) .await? { Some(gw_info) if gw_info.is_data_only() => { diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 88ad3478f..8da3c5011 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -214,6 +214,7 @@ impl GatewayResolver for GatewayClientAllOwnersValid { async fn resolve_gateway( &self, _address: &PublicKeyBinary, + _gateway_query_timestamp: &DateTime, ) -> Result { Ok(GatewayResolution::AssertedLocation(0x8c2681a3064d9ff)) } diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index c2c50aded..34de9564d 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -26,6 +26,7 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { async fn resolve_gateway_info( &self, address: &PublicKeyBinary, + _gateway_query_timestamp: &DateTime, ) -> Result, ClientError> { Ok(Some(GatewayInfo { address: address.clone(),