From 0e3f70ec3bbb2deb9202b626ecb71da5488ca93f Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 28 Mar 2025 10:16:14 -0400 Subject: [PATCH 01/13] Add receiving subscriber mapper activity requests to mobile ingestor --- Cargo.lock | 2 +- Cargo.toml | 4 +++ file_store/src/file_info.rs | 9 +++++ file_store/src/traits/file_sink_write.rs | 5 +++ file_store/src/traits/msg_verify.rs | 1 + ingest/src/server_mobile.rs | 45 +++++++++++++++++++++++- ingest/tests/common/mod.rs | 3 ++ 7 files changed, 67 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf4640121..e88c0221b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3816,7 +3816,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#8f84e2a1f4229354520bf380dd3e96590ae31c06" +source = "git+https://www.github.com/helium/proto.git?branch=jg%2Fdisco-shares-v2#f8a26f6727ec310f2f03a0e208267d4775e9203d" dependencies = [ "bytes", "prost", diff --git a/Cargo.toml b/Cargo.toml index 2a963bbee..61a855f59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,3 +135,7 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65 # [patch.'https://github.com/helium/proto'] # helium-proto = { path = "../../proto" } # beacon = { path = "../../proto" } + +[patch.'https://github.com/helium/proto'] +helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "jg/disco-shares-v2" } + diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 67a553a10..ad9e72d83 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -175,6 +175,8 @@ pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund"; pub const UNIQUE_CONNECTIONS_REPORT: &str = "unique_connections_report"; pub const VERIFIED_UNIQUE_CONNECTIONS_REPORT: &str = "verified_unique_connections_report"; +pub const SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT: &str = + "subscriber_mapping_activity_ingest_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -241,6 +243,7 @@ pub enum FileType { RadioUsageStatsReq, UniqueConnectionsReport, VerifiedUniqueConnectionsReport, + SubscriberMappingActivityIngestReport, } impl fmt::Display for FileType { @@ -328,6 +331,9 @@ impl FileType { Self::RadioUsageStatsReq => RADIO_USAGE_STATS_REQ, Self::UniqueConnectionsReport => UNIQUE_CONNECTIONS_REPORT, Self::VerifiedUniqueConnectionsReport => VERIFIED_UNIQUE_CONNECTIONS_REPORT, + Self::SubscriberMappingActivityIngestReport => { + SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT + } } } } @@ -411,6 +417,9 @@ impl FromStr for FileType { RADIO_USAGE_STATS_REQ => Self::RadioUsageStatsReq, UNIQUE_CONNECTIONS_REPORT => Self::UniqueConnectionsReport, VERIFIED_UNIQUE_CONNECTIONS_REPORT => Self::VerifiedUniqueConnectionsReport, + SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT => { + Self::SubscriberMappingActivityIngestReport + } _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index bc6a6d468..1fd434d5b 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -293,3 +293,8 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + poc_mobile::SubscriberMappingActivityIngestReportV1, + FileType::SubscriberMappingActivityIngestReport.to_str(), + "subscriber_mapping_activity_ingest_report" +); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 3c946919a..b3d04e990 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -106,6 +106,7 @@ impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); impl_msg_verify!(poc_mobile::HexUsageStatsReqV1, signature); impl_msg_verify!(poc_mobile::RadioUsageStatsReqV1, signature); impl_msg_verify!(poc_mobile::UniqueConnectionsReqV1, signature); +impl_msg_verify!(poc_mobile::SubscriberMappingActivityReqV1, signature); #[cfg(test)] mod test { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index dd771035f..d04d3e40c 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -20,7 +20,8 @@ use helium_proto::services::poc_mobile::{ RadioUsageStatsResV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, - SubscriberLocationReqV1, SubscriberLocationRespV1, + SubscriberLocationReqV1, SubscriberLocationRespV1, SubscriberMappingActivityIngestReportV1, + SubscriberMappingActivityReqV1, SubscriberMappingActivityResV1, SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1, UniqueConnectionsIngestReportV1, WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, WifiHeartbeatRespV1, @@ -55,6 +56,7 @@ pub struct GrpcServer { hex_usage_stats_event_sink: FileSinkClient, radio_usage_stats_event_sink: FileSinkClient, unique_connections_sink: FileSinkClient, + subscriber_mapping_activity_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -103,6 +105,7 @@ where hex_usage_stats_event_sink: FileSinkClient, radio_usage_stats_event_sink: FileSinkClient, unique_connections_sink: FileSinkClient, + subscriber_mapping_activity_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -121,6 +124,7 @@ where hex_usage_stats_event_sink, radio_usage_stats_event_sink, unique_connections_sink, + subscriber_mapping_activity_sink, required_network, address, api_token, @@ -478,6 +482,33 @@ where Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } + async fn submit_subscriber_mapping_activity( + &self, + request: Request, + ) -> GrpcResult { + let timestamp = Utc::now().timestamp_millis() as u64; + let event = request.into_inner(); + + custom_tracing::record_b58("subscriber_id", &event.subscriber_id); + custom_tracing::record_b58("pub_key", &event.carrier_pub_key); + + let report = self + .verify_public_key(&event.carrier_pub_key) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, event)) + .map(|(_, event)| SubscriberMappingActivityIngestReportV1 { + received_timestamp: timestamp, + report: Some(event), + })?; + + _ = self + .subscriber_mapping_activity_sink + .write(report, []) + .await; + + Ok(Response::new(SubscriberMappingActivityResV1 { timestamp })) + } + async fn submit_hex_usage_stats_report( &self, request: Request, @@ -690,6 +721,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (subscriber_mapping_activity_sink, subscriber_mapping_activity_server) = + SubscriberMappingActivityIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -715,6 +756,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { hex_usage_stats_event_sink, radio_usage_stats_event_sink, unique_connections_sink, + subscriber_mapping_activity_sink, settings.network, settings.listen_addr, api_token, @@ -741,6 +783,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(hex_usage_stats_event_server) .add_task(radio_usage_stats_event_server) .add_task(unique_connections_server) + .add_task(subscriber_mapping_activity_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index ac5208736..066de7f22 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -80,6 +80,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (hex_usage_stat_tx, hex_usage_stat_rx) = tokio::sync::mpsc::channel(10); let (radio_usage_stat_tx, radio_usage_stat_rx) = tokio::sync::mpsc::channel(10); let (unique_connections_tx, unique_connections_rx) = tokio::sync::mpsc::channel(10); + let (subscriber_mapping_activity_tx, _subscriber_mapping_activity_rx) = + tokio::sync::mpsc::channel(10); let auth_client = MockAuthorizationClient::new(); @@ -97,6 +99,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(hex_usage_stat_tx, "hex_usage_test_file_sink"), FileSinkClient::new(radio_usage_stat_tx, "radio_usage_test_file_sink"), FileSinkClient::new(unique_connections_tx, "noop"), + FileSinkClient::new(subscriber_mapping_activity_tx, "noop"), Network::MainNet, socket_addr, api_token, From 3fff4f7400a4bf4da1b130b8ef6dc2e2875d425d Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Sun, 30 Mar 2025 08:58:25 -0400 Subject: [PATCH 02/13] update mobile verifier to process subscriber mapper activity messages --- file_store/src/file_info.rs | 9 + file_store/src/traits/file_sink_write.rs | 6 + ingest/tests/common/mod.rs | 7 +- .../src/client/authorization_client.rs | 2 +- mobile_config/src/client/entity_client.rs | 2 +- .../42_subscriber_mapping_activity.sql | 18 ++ mobile_verifier/src/lib.rs | 1 + .../src/subscriber_mapping_activity.rs | 262 ++++++++++++++++++ .../src/subscriber_mapping_activity/db.rs | 35 +++ 9 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 mobile_verifier/migrations/42_subscriber_mapping_activity.sql create mode 100644 mobile_verifier/src/subscriber_mapping_activity.rs create mode 100644 mobile_verifier/src/subscriber_mapping_activity/db.rs diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index ad9e72d83..a62802058 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -177,6 +177,8 @@ pub const UNIQUE_CONNECTIONS_REPORT: &str = "unique_connections_report"; pub const VERIFIED_UNIQUE_CONNECTIONS_REPORT: &str = "verified_unique_connections_report"; pub const SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT: &str = "subscriber_mapping_activity_ingest_report"; +pub const VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT: &str = + "verified_subscriber_mapping_activity_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -244,6 +246,7 @@ pub enum FileType { UniqueConnectionsReport, VerifiedUniqueConnectionsReport, SubscriberMappingActivityIngestReport, + VerifiedSubscriberMappingActivityReport, } impl fmt::Display for FileType { @@ -334,6 +337,9 @@ impl FileType { Self::SubscriberMappingActivityIngestReport => { SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT } + Self::VerifiedSubscriberMappingActivityReport => { + VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT + } } } } @@ -420,6 +426,9 @@ impl FromStr for FileType { SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT => { Self::SubscriberMappingActivityIngestReport } + VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT => { + Self::VerifiedSubscriberMappingActivityReport + } _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 1fd434d5b..fa0347314 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -298,3 +298,9 @@ impl_file_sink!( FileType::SubscriberMappingActivityIngestReport.to_str(), "subscriber_mapping_activity_ingest_report" ); + +impl_file_sink!( + poc_mobile::VerifiedSubscriberMappingActivityReportV1, + FileType::VerifiedSubscriberMappingActivityReport.to_str(), + "verified_subscriber_mapping_activity_report" +); diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 066de7f22..210bf6399 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -40,15 +40,18 @@ impl MockAuthorizationClient { } } +#[derive(thiserror::Error, Debug)] +pub enum MockError {} + #[async_trait] impl AuthorizationVerifier for MockAuthorizationClient { - type Error = anyhow::Error; + type Error = MockError; async fn verify_authorized_key( &self, _pubkey: &PublicKeyBinary, _role: NetworkKeyRole, - ) -> anyhow::Result { + ) -> Result { Ok(true) } } diff --git a/mobile_config/src/client/authorization_client.rs b/mobile_config/src/client/authorization_client.rs index 0cccafddd..cfc1472ee 100644 --- a/mobile_config/src/client/authorization_client.rs +++ b/mobile_config/src/client/authorization_client.rs @@ -11,7 +11,7 @@ use std::{sync::Arc, time::Duration}; #[async_trait] pub trait AuthorizationVerifier: Send + Sync + 'static { - type Error; + type Error: std::error::Error + Send + Sync + 'static; async fn verify_authorized_key( &self, diff --git a/mobile_config/src/client/entity_client.rs b/mobile_config/src/client/entity_client.rs index 510a882c2..129b1d6af 100644 --- a/mobile_config/src/client/entity_client.rs +++ b/mobile_config/src/client/entity_client.rs @@ -11,7 +11,7 @@ use std::{sync::Arc, time::Duration}; #[async_trait] pub trait EntityVerifier { - type Error; + type Error: std::error::Error + Send + Sync + 'static; async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result; } diff --git a/mobile_verifier/migrations/42_subscriber_mapping_activity.sql b/mobile_verifier/migrations/42_subscriber_mapping_activity.sql new file mode 100644 index 000000000..83a6dddf4 --- /dev/null +++ b/mobile_verifier/migrations/42_subscriber_mapping_activity.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS subscriber_mapping_activity ( + subscriber_id BYTEA NOT NULL, + discovery_reward_shares BIGINT NOT NULL, + verification_reward_shares BIGINT NOT NULL, + received_timestamp TIMESTAMPTZ NOT NULL, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (subscriber_id, received_timestamp) +); + +INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at) +SELECT subscriber_id, 30, 0, received_timestamp, created_at AS inserted_at +FROM subscriber_loc_verified; + +UPDATE subscriber_mapping_activity sma +SET verification_reward_shares = svme.total_reward_points +FROM subscriber_verified_mapping_event svme +WHERE sma.subscriber_id = svme.subscriber_id + AND sma.received_timestamp::date = svme.received_timestamp::date; diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index cd326d3a7..c50d92854 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -15,6 +15,7 @@ pub mod sp_boosted_rewards_bans; pub mod speedtests; pub mod speedtests_average; pub mod subscriber_location; +pub mod subscriber_mapping_activity; pub mod subscriber_verified_mapping_event; pub mod telemetry; pub mod unique_connections; diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs new file mode 100644 index 000000000..20c1d4322 --- /dev/null +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -0,0 +1,262 @@ +use std::time::Instant; + +use chrono::{DateTime, Utc}; +use file_store::{ + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, + file_upload::FileUpload, + traits::{ + FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampDecode, + TimestampEncode, + }, + FileStore, FileType, +}; +use futures::{StreamExt, TryStreamExt}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + SubscriberMappingActivityIngestReportV1, SubscriberReportVerificationStatus, + VerifiedSubscriberMappingActivityReportV1, + }, +}; +use mobile_config::client::{ + authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, +}; +use sqlx::{Pool, Postgres}; +use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::mpsc::Receiver; + +use crate::Settings; + +pub mod db; + +pub struct SubscriberMappingActivityDaemon { + pool: Pool, + authorization_verifier: AV, + entity_verifier: EV, + stream_receiver: Receiver>, + verified_sink: FileSinkClient, +} + +impl SubscriberMappingActivityDaemon +where + AV: AuthorizationVerifier, + EV: EntityVerifier + Send + Sync + 'static, +{ + pub fn new( + pool: Pool, + authorization_verifier: AV, + entity_verifier: EV, + stream_receiver: Receiver>, + verified_sink: FileSinkClient, + ) -> Self { + Self { + pool, + authorization_verifier, + entity_verifier, + stream_receiver, + verified_sink, + } + } + + pub async fn create_managed_task( + pool: Pool, + settings: &Settings, + authorization_verifier: AV, + entity_verifier: EV, + file_store: FileStore, + file_upload: FileUpload, + ) -> anyhow::Result { + let (stream_reciever, stream_server) = file_source::Continuous::prost_source() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::SubscriberMappingActivityIngestReport.to_string()) + .create() + .await?; + + let (verified_sink, verified_sink_server) = + VerifiedSubscriberMappingActivityReportV1::file_sink( + settings.store_base_path(), + file_upload.clone(), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, + env!("CARGO_PKG_NAME"), + ) + .await?; + + let daemon = Self::new( + pool, + authorization_verifier, + entity_verifier, + stream_reciever, + verified_sink, + ); + + Ok(TaskManager::builder() + .add_task(stream_server) + .add_task(verified_sink_server) + .add_task(daemon) + .build()) + } + + pub async fn run(mut self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting"); + + loop { + tokio::select! { + biased; + _ = &mut shutdown => break, + Some(file) = self.stream_receiver.recv() => { + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("subscriber_mapping_activity_processing_time") + .record(start.elapsed()); + } + } + } + + tracing::info!("stopping"); + Ok(()) + } + + async fn process_file( + &self, + file_info_stream: FileInfoStream, + ) -> anyhow::Result<()> { + let mut transaction = self.pool.begin().await?; + let stream = file_info_stream.into_stream(&mut transaction).await?; + + let activity_stream = stream + .map(SubscriberMappingActivity::try_from) + .and_then(|sma| async move { + let status = self.verify_activity(&sma).await?; + Ok((sma, status)) + }) + .and_then(|(sma, status)| self.write_verified_report(sma, status)) + .try_filter_map(|(sma, status)| is_valid(sma, status)); + + db::save(&mut transaction, activity_stream).await?; + self.verified_sink.commit().await?; + transaction.commit().await?; + Ok(()) + } + + async fn write_verified_report( + &self, + mut activity: SubscriberMappingActivity, + status: SubscriberReportVerificationStatus, + ) -> anyhow::Result<( + SubscriberMappingActivity, + SubscriberReportVerificationStatus, + )> { + let verified_proto = VerifiedSubscriberMappingActivityReportV1 { + report: activity.take_original(), + status: status as i32, + timestamp: Utc::now().encode_timestamp_millis(), + }; + + self.verified_sink + .write(verified_proto, &[("status", status.as_str_name())]) + .await?; + + Ok((activity, status)) + } + + async fn verify_activity( + &self, + activity: &SubscriberMappingActivity, + ) -> anyhow::Result { + if !self + .verify_known_carrier_key(&activity.carrier_pub_key) + .await? + { + return Ok(SubscriberReportVerificationStatus::InvalidCarrierKey); + }; + if !self.verify_subscriber_id(&activity.subscriber_id).await? { + return Ok(SubscriberReportVerificationStatus::InvalidSubscriberId); + }; + Ok(SubscriberReportVerificationStatus::Valid) + } + + async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> anyhow::Result { + self.authorization_verifier + .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) + .await + .map_err(anyhow::Error::from) + } + + async fn verify_subscriber_id(&self, subscriber_id: &[u8]) -> anyhow::Result { + self.entity_verifier + .verify_rewardable_entity(subscriber_id) + .await + .map_err(anyhow::Error::from) + } +} + +async fn is_valid( + activity: SubscriberMappingActivity, + status: SubscriberReportVerificationStatus, +) -> anyhow::Result> { + if status == SubscriberReportVerificationStatus::Valid { + Ok(Some(activity)) + } else { + Ok(None) + } +} + +impl ManagedTask for SubscriberMappingActivityDaemon +where + AV: AuthorizationVerifier, + EV: EntityVerifier + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + // let handle = tokio::spawn(async move { self.run(shutdown).await }); + // Box::pin( + // handle + // .map_err(anyhow::Error::from) + // .and_then(|result| async move { result }), + // ) + } +} + +pub struct SubscriberMappingActivity { + subscriber_id: Vec, + discovery_reward_shares: u64, + verification_reward_shares: u64, + received_timestamp: DateTime, + carrier_pub_key: PublicKeyBinary, + original: Option, +} + +impl SubscriberMappingActivity { + fn take_original(&mut self) -> Option { + self.original.take() + } +} + +impl TryFrom for SubscriberMappingActivity { + type Error = anyhow::Error; + + fn try_from(value: SubscriberMappingActivityIngestReportV1) -> Result { + let original = value.clone(); + let report = value + .report + .ok_or_else(|| anyhow::anyhow!("SubscriberMappingActivityReqV1 not found"))?; + + Ok(Self { + subscriber_id: report.subscriber_id, + discovery_reward_shares: report.discovery_reward_shares, + verification_reward_shares: report.verification_reward_shares, + received_timestamp: value.received_timestamp.to_timestamp_millis()?, + carrier_pub_key: PublicKeyBinary::from(report.carrier_pub_key), + original: Some(original), + }) + } +} diff --git a/mobile_verifier/src/subscriber_mapping_activity/db.rs b/mobile_verifier/src/subscriber_mapping_activity/db.rs new file mode 100644 index 000000000..90ae09f4b --- /dev/null +++ b/mobile_verifier/src/subscriber_mapping_activity/db.rs @@ -0,0 +1,35 @@ +use chrono::Utc; +use futures::{Stream, TryStreamExt}; +use sqlx::{Postgres, QueryBuilder, Transaction}; + +use crate::subscriber_mapping_activity::SubscriberMappingActivity; + +pub async fn save( + transaction: &mut Transaction<'_, Postgres>, + ingest_reports: impl Stream>, +) -> anyhow::Result<()> { + const NUM_IN_BATCH: usize = (u16::MAX / 5) as usize; + + ingest_reports + .try_chunks(NUM_IN_BATCH) + .err_into::() + .try_fold(transaction, |txn, chunk| async move { + QueryBuilder::new("INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at)") + .push_values(chunk, |mut b, activity| { + + b.push_bind(activity.subscriber_id) + .push_bind(activity.discovery_reward_shares as i64) + .push_bind(activity.verification_reward_shares as i64) + .push_bind(activity.received_timestamp) + .push_bind(Utc::now()); + }) + .build() + .execute(&mut *txn) + .await?; + + Ok(txn) + }) + .await?; + + Ok(()) +} From 1cd0c07eca0a30c2cfdc8cb0c343e2f37d87d221 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Sun, 30 Mar 2025 10:19:09 -0400 Subject: [PATCH 03/13] update rewarder to use new subscriber mapping activity --- mobile_verifier/src/cli/server.rs | 16 +-- mobile_verifier/src/reward_shares.rs | 106 +++++------------- mobile_verifier/src/rewarder.rs | 20 +--- .../src/subscriber_mapping_activity.rs | 9 ++ .../src/subscriber_mapping_activity/db.rs | 29 ++++- 5 files changed, 69 insertions(+), 111 deletions(-) diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 3352b8044..9d2642058 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -10,8 +10,7 @@ use crate::{ rewarder::Rewarder, sp_boosted_rewards_bans::ServiceProviderBoostedRewardsBanIngestor, speedtests::SpeedtestDaemon, - subscriber_location::SubscriberLocationIngestor, - subscriber_verified_mapping_event::SubscriberVerifiedMappingEventDaemon, + subscriber_mapping_activity::SubscriberMappingActivityDaemon, telemetry, unique_connections::ingestor::UniqueConnectionsIngestor, Settings, @@ -127,7 +126,7 @@ impl Cmd { .await?, ) .add_task( - SubscriberVerifiedMappingEventDaemon::create_managed_task( + SubscriberMappingActivityDaemon::create_managed_task( pool.clone(), settings, auth_client.clone(), @@ -157,17 +156,6 @@ impl Cmd { ) .await?, ) - .add_task( - SubscriberLocationIngestor::create_managed_task( - pool.clone(), - settings, - file_upload.clone(), - report_ingest.clone(), - auth_client.clone(), - entity_client.clone(), - ) - .await?, - ) .add_task( RadioThresholdIngestor::create_managed_task( pool.clone(), diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 755fde0b1..dd4bdd11b 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -7,6 +7,7 @@ use crate::{ sp_boosted_rewards_bans::BannedRadios, speedtests_average::SpeedtestAverages, subscriber_location::SubscriberValidatedLocations, + subscriber_mapping_activity::SubscriberMappingShares, subscriber_verified_mapping_event::VerifiedSubscriberVerifiedMappingEventShares, unique_connections::{self, UniqueConnectionCounts}, PriceInfo, @@ -183,38 +184,23 @@ impl TransferRewards { #[derive(Default)] pub struct MapperShares { - pub discovery_mapping_shares: SubscriberValidatedLocations, - pub verified_mapping_event_shares: VerifiedSubscriberVerifiedMappingEventShares, + mapping_activity_shares: Vec, } impl MapperShares { - pub fn new( - discovery_mapping_shares: SubscriberValidatedLocations, - verified_mapping_event_shares: VerifiedSubscriberVerifiedMappingEventShares, - ) -> Self { + pub fn new(mapping_activity_shares: Vec) -> Self { Self { - discovery_mapping_shares, - verified_mapping_event_shares, + mapping_activity_shares, } } pub fn rewards_per_share(&self, total_mappers_pool: Decimal) -> anyhow::Result { - let discovery_mappers_count = Decimal::from(self.discovery_mapping_shares.len()); - - // calculate the total eligible mapping shares for the epoch - // this could be simplified as every subscriber is awarded the same share - // however the function is setup to allow the verification mapper shares to be easily - // added without impacting code structure ( the per share value for those will be different ) - let total_mapper_shares = discovery_mappers_count * DISCOVERY_MAPPING_SHARES; - - let total_verified_mapping_event_shares: Decimal = self - .verified_mapping_event_shares + let total_shares = self + .mapping_activity_shares .iter() - .map(|share| Decimal::from(share.total_reward_points)) + .map(|mas| Decimal::from(mas.discovery_reward_shares + mas.verification_reward_shares)) .sum(); - let total_shares = total_mapper_shares + total_verified_mapping_event_shares; - let res = total_mappers_pool .checked_div(total_shares) .unwrap_or(Decimal::ZERO); @@ -224,73 +210,35 @@ impl MapperShares { pub fn into_subscriber_rewards( self, - reward_period: &'_ Range>, + reward_period: &Range>, reward_per_share: Decimal, ) -> impl Iterator + '_ { - let mut subscriber_rewards: HashMap, proto::SubscriberReward> = HashMap::new(); - - let discovery_location_amount = (DISCOVERY_MAPPING_SHARES * reward_per_share) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or_default(); - - if discovery_location_amount > 0 { - // Collect rewards from discovery_mapping_shares - for subscriber_id in self.discovery_mapping_shares { - subscriber_rewards - .entry(subscriber_id.clone()) - .and_modify(|reward| { - reward.discovery_location_amount = discovery_location_amount; - }) - .or_insert_with(|| proto::SubscriberReward { - subscriber_id: subscriber_id.clone(), - discovery_location_amount, - verification_mapping_amount: 0, - }); - } - } + self.mapping_activity_shares.into_iter().map(move |mas| { + let discovery_location_amount = (Decimal::from(mas.discovery_reward_shares) + * reward_per_share) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or_default(); - // Collect rewards from verified_mapping_event_shares - for verified_share in self.verified_mapping_event_shares { - let verification_mapping_amount = (Decimal::from(verified_share.total_reward_points) + let verification_mapping_amount = (Decimal::from(mas.verification_reward_shares) * reward_per_share) .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() .unwrap_or_default(); - if verification_mapping_amount > 0 { - subscriber_rewards - .entry(verified_share.subscriber_id.clone()) - .and_modify(|reward| { - reward.verification_mapping_amount = verification_mapping_amount; - }) - .or_insert_with(|| proto::SubscriberReward { - subscriber_id: verified_share.subscriber_id.clone(), - discovery_location_amount: 0, + ( + discovery_location_amount + verification_mapping_amount, + proto::MobileRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::SubscriberReward(proto::SubscriberReward { + subscriber_id: mas.subscriber_id, + discovery_location_amount, verification_mapping_amount, - }); - } - } - - // Create the MobileRewardShare for each subscriber - subscriber_rewards - .into_values() - .filter(|reward| { - reward.discovery_location_amount > 0 || reward.verification_mapping_amount > 0 - }) - .map(|subscriber_reward| { - let total_reward_amount = subscriber_reward.discovery_location_amount - + subscriber_reward.verification_mapping_amount; - - ( - total_reward_amount, - proto::MobileRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::SubscriberReward(subscriber_reward)), - }, - ) - }) + })), + }, + ) + }) } } diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index ee8ec4172..8dc620962 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -10,7 +10,7 @@ use crate::{ service_provider::{self, ServiceProviderDCSessions, ServiceProviderPromotions}, sp_boosted_rewards_bans, speedtests, speedtests_average::SpeedtestAverages, - subscriber_location, subscriber_verified_mapping_event, telemetry, unique_connections, + subscriber_mapping_activity, subscriber_verified_mapping_event, telemetry, unique_connections, PriceInfo, Settings, }; use anyhow::bail; @@ -563,25 +563,13 @@ pub async fn reward_mappers( mobile_rewards: &FileSinkClient, reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { - // Mapper rewards currently include rewards for discovery mapping only. - // Verification mapping rewards to be added - // get subscriber location shares this epoch - let location_shares = - subscriber_location::aggregate_location_shares(pool, &reward_info.epoch_period).await?; - - // Verification mappers can only earn rewards if they qualified for disco mapping - // rewards during the same reward_period - let vsme_shares = subscriber_verified_mapping_event::aggregate_verified_mapping_events( + let rewardable_mapping_activity = subscriber_mapping_activity::db::rewardable_mapping_activity( pool, &reward_info.epoch_period, ) - .await? - .into_iter() - .filter(|vsme| location_shares.contains(&vsme.subscriber_id)) - .collect(); + .await?; - // determine mapping shares based on location shares and data transferred - let mapping_shares = MapperShares::new(location_shares, vsme_shares); + let mapping_shares = MapperShares::new(rewardable_mapping_activity); let total_mappers_pool = reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions); let rewards_per_share = mapping_shares.rewards_per_share(total_mappers_pool)?; diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs index 20c1d4322..cc4e9d3eb 100644 --- a/mobile_verifier/src/subscriber_mapping_activity.rs +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -260,3 +260,12 @@ impl TryFrom for SubscriberMappingActiv }) } } + +#[derive(Clone, Debug, sqlx::FromRow)] +pub struct SubscriberMappingShares { + pub subscriber_id: Vec, + #[sqlx(try_from = "i64")] + pub discovery_reward_shares: u64, + #[sqlx(try_from = "i64")] + pub verification_reward_shares: u64, +} diff --git a/mobile_verifier/src/subscriber_mapping_activity/db.rs b/mobile_verifier/src/subscriber_mapping_activity/db.rs index 90ae09f4b..385e3866e 100644 --- a/mobile_verifier/src/subscriber_mapping_activity/db.rs +++ b/mobile_verifier/src/subscriber_mapping_activity/db.rs @@ -1,9 +1,13 @@ -use chrono::Utc; +use std::ops::Range; + +use chrono::{DateTime, Utc}; use futures::{Stream, TryStreamExt}; -use sqlx::{Postgres, QueryBuilder, Transaction}; +use sqlx::{Pool, Postgres, QueryBuilder, Transaction}; use crate::subscriber_mapping_activity::SubscriberMappingActivity; +use super::SubscriberMappingShares; + pub async fn save( transaction: &mut Transaction<'_, Postgres>, ingest_reports: impl Stream>, @@ -33,3 +37,24 @@ pub async fn save( Ok(()) } + +pub async fn rewardable_mapping_activity( + pool: &Pool, + epoch_period: &Range>, +) -> anyhow::Result> { + sqlx::query_as( + r#" + SELECT DISTINCT ON (subscriber_id) subscriber_id, discovery_reward_shares, verification_reward_shares + FROM subscriber_mapping_activity + WHERE received_timestamp >= $1 + AND received_timestamp < $2 + AND (discovery_reward_shares > 0 OR verification_reward_shares > 0) + ORDER BY subscriber_id, received_timestamp DESC + "#, + ) + .bind(epoch_period.start) + .bind(epoch_period.end) + .fetch_all(pool) + .await + .map_err(anyhow::Error::from) +} From 5d28d9a0853e8eeecc6ad465f3fd8d205eb82133 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Sun, 30 Mar 2025 10:55:59 -0400 Subject: [PATCH 04/13] making saving actvity idempotent and fix tests --- mobile_verifier/src/reward_shares.rs | 25 +---- .../src/subscriber_mapping_activity.rs | 12 +- .../src/subscriber_mapping_activity/db.rs | 1 + .../tests/integrations/rewarder_mappers.rs | 106 +++++------------- 4 files changed, 41 insertions(+), 103 deletions(-) diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index dd4bdd11b..8cc8dafdd 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -6,9 +6,7 @@ use crate::{ seniority::Seniority, sp_boosted_rewards_bans::BannedRadios, speedtests_average::SpeedtestAverages, - subscriber_location::SubscriberValidatedLocations, subscriber_mapping_activity::SubscriberMappingShares, - subscriber_verified_mapping_event::VerifiedSubscriberVerifiedMappingEventShares, unique_connections::{self, UniqueConnectionCounts}, PriceInfo, }; @@ -51,9 +49,6 @@ pub const DEFAULT_PREC: u32 = 15; /// Percent of total emissions allocated for mapper rewards const MAPPERS_REWARDS_PERCENT: Decimal = dec!(0.2); -/// shares of the mappers pool allocated per eligible subscriber for discovery mapping -const DISCOVERY_MAPPING_SHARES: Decimal = dec!(30); - // Percent of total emissions allocated for service provider rewards const SERVICE_PROVIDER_PERCENT: Decimal = dec!(0.1); @@ -730,8 +725,6 @@ mod test { }, speedtests::Speedtest, speedtests_average::SpeedtestAverage, - subscriber_location::SubscriberValidatedLocations, - subscriber_verified_mapping_event::VerifiedSubscriberVerifiedMappingEventShare, }; use chrono::{Duration, Utc}; use file_store::speedtest::CellSpeedtest; @@ -853,19 +846,13 @@ mod test { async fn subscriber_rewards() { const NUM_SUBSCRIBERS: u64 = 10_000; - // simulate 10k subscriber location shares - let mut location_shares = SubscriberValidatedLocations::new(); - for n in 0..NUM_SUBSCRIBERS { - location_shares.push(n.encode_to_vec()); - } - - // simulate 10k vsme shares - let mut vsme_shares = VerifiedSubscriberVerifiedMappingEventShares::new(); + let mut mapping_activity_shares = Vec::new(); for n in 0..NUM_SUBSCRIBERS { - vsme_shares.push(VerifiedSubscriberVerifiedMappingEventShare { + mapping_activity_shares.push(SubscriberMappingShares { subscriber_id: n.encode_to_vec(), - total_reward_points: 30, - }); + discovery_reward_shares: 30, + verification_reward_shares: 30, + }) } // set our rewards info @@ -873,7 +860,7 @@ mod test { default_rewards_info(EMISSIONS_POOL_IN_BONES_24_HOURS, Duration::hours(24)); // translate location shares into shares - let shares = MapperShares::new(location_shares, vsme_shares); + let shares = MapperShares::new(mapping_activity_shares); let total_mappers_pool = reward_shares::get_scheduled_tokens_for_mappers(rewards_info.epoch_emissions); let rewards_per_share = shares.rewards_per_share(total_mappers_pool).unwrap(); diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs index cc4e9d3eb..0d3076be0 100644 --- a/mobile_verifier/src/subscriber_mapping_activity.rs +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -227,12 +227,12 @@ where } pub struct SubscriberMappingActivity { - subscriber_id: Vec, - discovery_reward_shares: u64, - verification_reward_shares: u64, - received_timestamp: DateTime, - carrier_pub_key: PublicKeyBinary, - original: Option, + pub subscriber_id: Vec, + pub discovery_reward_shares: u64, + pub verification_reward_shares: u64, + pub received_timestamp: DateTime, + pub carrier_pub_key: PublicKeyBinary, + pub original: Option, } impl SubscriberMappingActivity { diff --git a/mobile_verifier/src/subscriber_mapping_activity/db.rs b/mobile_verifier/src/subscriber_mapping_activity/db.rs index 385e3866e..de5e618a1 100644 --- a/mobile_verifier/src/subscriber_mapping_activity/db.rs +++ b/mobile_verifier/src/subscriber_mapping_activity/db.rs @@ -27,6 +27,7 @@ pub async fn save( .push_bind(activity.received_timestamp) .push_bind(Utc::now()); }) + .push("ON CONFLICT (subscriber_id, received_timestamp) DO NOTHING") .build() .execute(&mut *txn) .await?; diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 6162fea78..437bb4087 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -2,11 +2,7 @@ use crate::common::{ self, default_rewards_info, MockFileSinkReceiver, EMISSIONS_POOL_IN_BONES_24_HOURS, }; use chrono::{DateTime, Duration as ChronoDuration, Duration, Utc}; -use file_store::{ - mobile_subscriber::{SubscriberLocationIngestReport, SubscriberLocationReq}, - subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, - subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, -}; +use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::{ services::poc_mobile::{ @@ -15,7 +11,8 @@ use helium_proto::{ Message, }; use mobile_verifier::{ - reward_shares, rewarder, subscriber_location, subscriber_verified_mapping_event, + reward_shares, rewarder, + subscriber_mapping_activity::{self, SubscriberMappingActivity}, }; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -104,54 +101,6 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -#[sqlx::test] -async fn test_subscriber_can_only_earn_verification_mapping_if_earned_disco_mapping( - pool: PgPool, -) -> anyhow::Result<()> { - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); - - let reward_info = default_rewards_info(EMISSIONS_POOL_IN_BONES_24_HOURS, Duration::hours(24)); - - let mut txn = pool.begin().await?; - let sub_loc_report = SubscriberLocationIngestReport { - received_timestamp: reward_info.epoch_period.end - ChronoDuration::hours(1), - report: SubscriberLocationReq { - subscriber_id: SUBSCRIBER_1.to_string().encode_to_vec(), - timestamp: reward_info.epoch_period.end - ChronoDuration::hours(1), - carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - }, - }; - subscriber_location::save(&sub_loc_report, &mut txn).await?; - - let vme_report = SubscriberVerifiedMappingEventIngestReport { - received_timestamp: reward_info.epoch_period.end - ChronoDuration::hours(1), - report: SubscriberVerifiedMappingEvent { - subscriber_id: SUBSCRIBER_2.to_string().encode_to_vec(), - total_reward_points: 50, - timestamp: reward_info.epoch_period.end - ChronoDuration::hours(1), - carrier_mapping_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - }, - }; - subscriber_verified_mapping_event::save_to_db(&vme_report, &mut txn).await?; - - txn.commit().await?; - - let (_, rewards) = tokio::join!( - rewarder::reward_mappers(&pool, &mobile_rewards_client, &reward_info), - mobile_rewards.receive_subscriber_reward() - ); - - mobile_rewards.assert_no_messages(); - - let total_pool = reward_shares::get_scheduled_tokens_for_mappers(reward_info.epoch_emissions); - assert_eq!( - rewards.discovery_location_amount + rewards.verification_mapping_amount, - total_pool.to_u64().unwrap() - ); - - Ok(()) -} - async fn receive_expected_rewards( mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(Vec, UnallocatedReward)> { @@ -180,42 +129,43 @@ async fn seed_mapping_data( // subscriber 1 has two qualifying mapping criteria reports // subscribers 2 and 3 have a single qualifying mapping criteria report - let report1 = SubscriberLocationIngestReport { - received_timestamp: ts - ChronoDuration::hours(1), - report: SubscriberLocationReq { + let reports = vec![ + SubscriberMappingActivity { + received_timestamp: ts - ChronoDuration::hours(1), subscriber_id: SUBSCRIBER_1.to_string().encode_to_vec(), - timestamp: ts - ChronoDuration::hours(1), + discovery_reward_shares: 30, + verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), + original: None, }, - }; - let report2 = SubscriberLocationIngestReport { - received_timestamp: ts - ChronoDuration::hours(1), - report: SubscriberLocationReq { + SubscriberMappingActivity { + received_timestamp: ts - ChronoDuration::hours(2), subscriber_id: SUBSCRIBER_1.to_string().encode_to_vec(), - timestamp: ts - ChronoDuration::hours(2), + discovery_reward_shares: 30, + verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), + original: None, }, - }; - let report3 = SubscriberLocationIngestReport { - received_timestamp: ts - ChronoDuration::hours(1), - report: SubscriberLocationReq { + SubscriberMappingActivity { + received_timestamp: ts - ChronoDuration::hours(1), subscriber_id: SUBSCRIBER_2.to_string().encode_to_vec(), - timestamp: ts - ChronoDuration::hours(3), + discovery_reward_shares: 30, + verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), + original: None, }, - }; - let report4 = SubscriberLocationIngestReport { - received_timestamp: ts - ChronoDuration::hours(1), - report: SubscriberLocationReq { + SubscriberMappingActivity { + received_timestamp: ts - ChronoDuration::hours(1), subscriber_id: SUBSCRIBER_3.to_string().encode_to_vec(), - timestamp: ts - ChronoDuration::hours(3), + discovery_reward_shares: 30, + verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), + original: None, }, - }; - subscriber_location::save(&report1, txn).await?; - subscriber_location::save(&report2, txn).await?; - subscriber_location::save(&report3, txn).await?; - subscriber_location::save(&report4, txn).await?; + ]; + + subscriber_mapping_activity::db::save(txn, stream::iter(reports.into_iter()).map(anyhow::Ok)) + .await?; Ok(()) } From 2dfe73a5b3bca5f32ee3ec01fd083e4255908a76 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 31 Mar 2025 07:58:30 -0400 Subject: [PATCH 05/13] remove unused subscriber_location and subscriber_verified mapping from mobile verifier --- mobile_verifier/src/lib.rs | 2 - mobile_verifier/src/rewarder.rs | 5 +- mobile_verifier/src/subscriber_location.rs | 264 ----------------- .../src/subscriber_mapping_activity/db.rs | 20 +- .../src/subscriber_verified_mapping_event.rs | 278 ------------------ 5 files changed, 19 insertions(+), 550 deletions(-) delete mode 100644 mobile_verifier/src/subscriber_location.rs delete mode 100644 mobile_verifier/src/subscriber_verified_mapping_event.rs diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index c50d92854..55fe923b6 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -14,9 +14,7 @@ mod settings; pub mod sp_boosted_rewards_bans; pub mod speedtests; pub mod speedtests_average; -pub mod subscriber_location; pub mod subscriber_mapping_activity; -pub mod subscriber_verified_mapping_event; pub mod telemetry; pub mod unique_connections; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8dc620962..c8925a0c0 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -10,8 +10,7 @@ use crate::{ service_provider::{self, ServiceProviderDCSessions, ServiceProviderPromotions}, sp_boosted_rewards_bans, speedtests, speedtests_average::SpeedtestAverages, - subscriber_mapping_activity, subscriber_verified_mapping_event, telemetry, unique_connections, - PriceInfo, Settings, + subscriber_mapping_activity, telemetry, unique_connections, PriceInfo, Settings, }; use anyhow::bail; use chrono::{DateTime, TimeZone, Utc}; @@ -334,7 +333,7 @@ where coverage::clear_coverage_objects(&mut transaction, &reward_info.epoch_period.start).await?; sp_boosted_rewards_bans::clear_bans(&mut transaction, reward_info.epoch_period.start) .await?; - subscriber_verified_mapping_event::clear(&mut transaction, &reward_info.epoch_period.start) + subscriber_mapping_activity::db::clear(&mut transaction, reward_info.epoch_period.start) .await?; unique_connections::db::clear(&mut transaction, &reward_info.epoch_period.start).await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs deleted file mode 100644 index 35afd14f8..000000000 --- a/mobile_verifier/src/subscriber_location.rs +++ /dev/null @@ -1,264 +0,0 @@ -use chrono::{DateTime, Duration, Utc}; -use file_store::{ - file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::FileSinkClient, - file_source, - file_upload::FileUpload, - mobile_subscriber::{ - SubscriberLocationIngestReport, SubscriberLocationReq, - VerifiedSubscriberLocationIngestReport, - }, - traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, - FileStore, FileType, -}; -use futures::{StreamExt, TryStreamExt}; -use futures_util::TryFutureExt; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::mobile_config::NetworkKeyRole; -use helium_proto::services::poc_mobile::{ - SubscriberReportVerificationStatus, VerifiedSubscriberLocationIngestReportV1, -}; -use mobile_config::client::{ - authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, -}; -use sqlx::{PgPool, Pool, Postgres, Transaction}; -use std::{ops::Range, time::Instant}; -use task_manager::{ManagedTask, TaskManager}; -use tokio::sync::mpsc::Receiver; - -use crate::Settings; - -const SUBSCRIBER_REWARD_PERIOD_IN_DAYS: i64 = 1; - -pub type SubscriberValidatedLocations = Vec>; - -pub struct SubscriberLocationIngestor { - pub pool: PgPool, - authorization_verifier: AV, - entity_verifier: EV, - reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, -} - -impl SubscriberLocationIngestor -where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, -{ - pub async fn create_managed_task( - pool: Pool, - settings: &Settings, - file_upload: FileUpload, - file_store: FileStore, - authorization_verifier: AV, - entity_verifier: EV, - ) -> anyhow::Result { - let (verified_subscriber_location, verified_subscriber_location_server) = - VerifiedSubscriberLocationIngestReportV1::file_sink( - settings.store_base_path(), - file_upload.clone(), - FileSinkCommitStrategy::Manual, - FileSinkRollTime::Default, - env!("CARGO_PKG_NAME"), - ) - .await?; - - let (subscriber_location_ingest, subscriber_location_ingest_server) = - file_source::continuous_source() - .state(pool.clone()) - .store(file_store.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after)) - .prefix(FileType::SubscriberLocationIngestReport.to_string()) - .create() - .await?; - - let subscriber_location_ingestor = SubscriberLocationIngestor::new( - pool, - authorization_verifier, - entity_verifier, - subscriber_location_ingest, - verified_subscriber_location, - ); - - Ok(TaskManager::builder() - .add_task(verified_subscriber_location_server) - .add_task(subscriber_location_ingest_server) - .add_task(subscriber_location_ingestor) - .build()) - } - - pub fn new( - pool: sqlx::Pool, - authorization_verifier: AV, - entity_verifier: EV, - reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, - ) -> Self { - Self { - pool, - authorization_verifier, - entity_verifier, - reports_receiver, - verified_report_sink, - } - } - - async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - loop { - tokio::select! { - biased; - _ = shutdown.clone() => break, - Some(file) = self.reports_receiver.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!("subscriber_location_processing_time") - .record(start.elapsed()); - } - } - } - tracing::info!("stopping subscriber location reports handler"); - Ok(()) - } - - async fn process_file( - &self, - file_info_stream: FileInfoStream, - ) -> anyhow::Result<()> { - let mut transaction = self.pool.begin().await?; - file_info_stream - .into_stream(&mut transaction) - .await? - .map(anyhow::Ok) - .try_fold( - transaction, - |mut transaction, loc_ingest_report| async move { - // verifiy the report - let verified_report_status = - self.verify_report(&loc_ingest_report.report).await; - - // if the report is valid then save to the db - // and thus available to be rewarded - if verified_report_status == SubscriberReportVerificationStatus::Valid { - save(&loc_ingest_report, &mut transaction).await?; - } - - // write out paper trail of verified report, valid or invalid - let verified_report_proto: VerifiedSubscriberLocationIngestReportV1 = - VerifiedSubscriberLocationIngestReport { - report: loc_ingest_report, - status: verified_report_status, - timestamp: Utc::now(), - } - .into(); - self.verified_report_sink - .write( - verified_report_proto, - &[("report_status", verified_report_status.as_str_name())], - ) - .await?; - - Ok(transaction) - }, - ) - .await? - .commit() - .await?; - self.verified_report_sink.commit().await?; - Ok(()) - } - - async fn verify_report( - &self, - report: &SubscriberLocationReq, - ) -> SubscriberReportVerificationStatus { - if !self.verify_known_carrier_key(&report.carrier_pub_key).await { - return SubscriberReportVerificationStatus::InvalidCarrierKey; - }; - if !self.verify_subscriber_id(&report.subscriber_id).await { - return SubscriberReportVerificationStatus::InvalidSubscriberId; - }; - SubscriberReportVerificationStatus::Valid - } - - async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> bool { - self.authorization_verifier - .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) - .await - .unwrap_or_default() - } - - async fn verify_subscriber_id(&self, subscriber_id: &[u8]) -> bool { - self.entity_verifier - .verify_rewardable_entity(subscriber_id) - .await - .unwrap_or_default() - } -} - -impl ManagedTask for SubscriberLocationIngestor -where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, -{ - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { - let handle = tokio::spawn(self.run(shutdown)); - Box::pin( - handle - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }), - ) - } -} - -pub async fn save( - loc_ingest_report: &SubscriberLocationIngestReport, - db: &mut Transaction<'_, Postgres>, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - INSERT INTO subscriber_loc_verified (subscriber_id, received_timestamp) - VALUES ($1, $2) - "#, - ) - .bind(loc_ingest_report.report.subscriber_id.clone()) - .bind(loc_ingest_report.received_timestamp) - .execute(&mut *db) - .await?; - Ok(()) -} - -#[derive(sqlx::FromRow)] -pub struct SubscriberLocationShare { - pub subscriber_id: Vec, -} - -pub async fn aggregate_location_shares( - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, -) -> Result { - let mut rows = sqlx::query_as::<_, SubscriberLocationShare>( - "select distinct(subscriber_id) from subscriber_loc_verified where received_timestamp >= $1 and received_timestamp < $2", - ) - .bind(reward_period.end - Duration::days(SUBSCRIBER_REWARD_PERIOD_IN_DAYS)) - .bind(reward_period.end) - .fetch(db); - let mut location_shares = SubscriberValidatedLocations::new(); - while let Some(share) = rows.try_next().await? { - location_shares.push(share.subscriber_id) - } - Ok(location_shares) -} - -pub async fn clear_location_shares( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - timestamp: &DateTime, -) -> Result<(), sqlx::Error> { - sqlx::query("delete from subscriber_loc_verified where received_timestamp < $1") - .bind(timestamp) - .execute(&mut *tx) - .await?; - Ok(()) -} diff --git a/mobile_verifier/src/subscriber_mapping_activity/db.rs b/mobile_verifier/src/subscriber_mapping_activity/db.rs index de5e618a1..6f33e9d45 100644 --- a/mobile_verifier/src/subscriber_mapping_activity/db.rs +++ b/mobile_verifier/src/subscriber_mapping_activity/db.rs @@ -2,7 +2,7 @@ use std::ops::Range; use chrono::{DateTime, Utc}; use futures::{Stream, TryStreamExt}; -use sqlx::{Pool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgExecutor, Postgres, QueryBuilder, Transaction}; use crate::subscriber_mapping_activity::SubscriberMappingActivity; @@ -40,7 +40,7 @@ pub async fn save( } pub async fn rewardable_mapping_activity( - pool: &Pool, + db: impl PgExecutor<'_>, epoch_period: &Range>, ) -> anyhow::Result> { sqlx::query_as( @@ -55,7 +55,21 @@ pub async fn rewardable_mapping_activity( ) .bind(epoch_period.start) .bind(epoch_period.end) - .fetch_all(pool) + .fetch_all(db) .await .map_err(anyhow::Error::from) } + +pub async fn clear(db: impl PgExecutor<'_>, timestamp: DateTime) -> anyhow::Result<()> { + sqlx::query( + " + DELETE FROM subscriber_mapping_activity + WHERE received_timestamp < $1 + ", + ) + .bind(timestamp) + .execute(db) + .await?; + + Ok(()) +} diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs deleted file mode 100644 index 8bdb4bb1b..000000000 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ /dev/null @@ -1,278 +0,0 @@ -use crate::Settings; -use chrono::{DateTime, Utc}; -use file_store::{ - file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::FileSinkClient, - file_source, - file_upload::FileUpload, - subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, - subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, - verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, - FileStore, FileType, -}; -use futures::{stream::StreamExt, TryStreamExt}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::{ - mobile_config::NetworkKeyRole, - poc_mobile::{ - SubscriberVerifiedMappingEventVerificationStatus, - VerifiedSubscriberVerifiedMappingEventIngestReportV1, - }, -}; -use mobile_config::client::{ - authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, -}; -use sqlx::{Pool, Postgres, Transaction}; -use std::ops::Range; -use task_manager::{ManagedTask, TaskManager}; -use tokio::sync::mpsc::Receiver; - -pub struct SubscriberVerifiedMappingEventDaemon { - pool: Pool, - authorization_verifier: AV, - entity_verifier: EV, - reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, -} - -impl SubscriberVerifiedMappingEventDaemon -where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, -{ - pub fn new( - pool: Pool, - authorization_verifier: AV, - entity_verifier: EV, - reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, - ) -> Self { - Self { - pool, - authorization_verifier, - entity_verifier, - reports_receiver, - verified_report_sink, - } - } - - pub async fn create_managed_task( - pool: Pool, - settings: &Settings, - authorization_verifier: AV, - entity_verifier: EV, - file_store: FileStore, - file_upload: FileUpload, - ) -> anyhow::Result { - let (reports_receiver, reports_receiver_server) = file_source::continuous_source() - .state(pool.clone()) - .store(file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after)) - .prefix(FileType::SubscriberVerifiedMappingEventIngestReport.to_string()) - .create() - .await?; - - let (verified_report_sink, verified_report_sink_server) = - VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink( - settings.store_base_path(), - file_upload.clone(), - FileSinkCommitStrategy::Manual, - FileSinkRollTime::Default, - env!("CARGO_PKG_NAME"), - ) - .await?; - - let task = Self::new( - pool, - authorization_verifier, - entity_verifier, - reports_receiver, - verified_report_sink, - ); - - Ok(TaskManager::builder() - .add_task(reports_receiver_server) - .add_task(verified_report_sink_server) - .add_task(task) - .build()) - } - - pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tracing::info!("Starting sme deamon"); - loop { - tokio::select! { - biased; - _ = shutdown.clone() => { - tracing::info!("sme deamon shutting down"); - break; - } - Some(file) = self.reports_receiver.recv() => { - self.process_file(file).await?; - } - } - } - Ok(()) - } - - async fn process_file( - &self, - file_info_stream: FileInfoStream, - ) -> anyhow::Result<()> { - tracing::info!( - "Processing Verified Mapping Event file {}", - file_info_stream.file_info.key - ); - - let mut transaction = self.pool.begin().await?; - - file_info_stream - .into_stream(&mut transaction) - .await? - .map(anyhow::Ok) - .try_fold(transaction, |mut transaction, report: SubscriberVerifiedMappingEventIngestReport| async move { - // verifiy the report - let verified_report_status = self.verify_event(&report.report).await; - - // if the report is valid then save to the db - // and thus available to be rewarded - if verified_report_status - == SubscriberVerifiedMappingEventVerificationStatus::SvmeValid - { - save_to_db(&report, &mut transaction).await?; - } - - // write out paper trail of verified report, valid or invalid - let verified_report_proto: VerifiedSubscriberVerifiedMappingEventIngestReportV1 = - VerifiedSubscriberVerifiedMappingEventIngestReport { - report, - status: verified_report_status, - timestamp: Utc::now(), - } - .into(); - - self.verified_report_sink - .write( - verified_report_proto, - &[("report_status", verified_report_status.as_str_name())], - ) - .await?; - - Ok(transaction) - }) - .await? - .commit() - .await?; - - self.verified_report_sink.commit().await?; - - Ok(()) - } - - async fn verify_event( - &self, - event: &SubscriberVerifiedMappingEvent, - ) -> SubscriberVerifiedMappingEventVerificationStatus { - if !self - .verify_known_carrier_key(&event.carrier_mapping_key) - .await - { - return SubscriberVerifiedMappingEventVerificationStatus::SvmeInvalidCarrierKey; - } - - if !self.verify_subscriber_id(&event.subscriber_id).await { - return SubscriberVerifiedMappingEventVerificationStatus::SvmeInvalidSubscriberId; - } - - SubscriberVerifiedMappingEventVerificationStatus::SvmeValid - } - - async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> bool { - self.authorization_verifier - .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) - .await - .unwrap_or_default() - } - - async fn verify_subscriber_id(&self, subscriber_id: &[u8]) -> bool { - self.entity_verifier - .verify_rewardable_entity(subscriber_id) - .await - .unwrap_or_default() - } -} - -impl ManagedTask for SubscriberVerifiedMappingEventDaemon -where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, -{ - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { - Box::pin(self.run(shutdown)) - } -} - -pub async fn save_to_db( - report: &SubscriberVerifiedMappingEventIngestReport, - exec: &mut Transaction<'_, Postgres>, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - INSERT INTO subscriber_verified_mapping_event (subscriber_id, total_reward_points, received_timestamp) - VALUES ($1, $2, $3) - ON CONFLICT (subscriber_id, received_timestamp) DO NOTHING - "#, - ) - .bind(&report.report.subscriber_id) - .bind(report.report.total_reward_points as i64) - .bind(report.received_timestamp) - .execute(exec) - .await?; - - Ok(()) -} - -pub type VerifiedSubscriberVerifiedMappingEventShares = - Vec; - -#[derive(sqlx::FromRow, PartialEq, Debug)] -pub struct VerifiedSubscriberVerifiedMappingEventShare { - pub subscriber_id: Vec, - pub total_reward_points: i64, -} - -pub async fn aggregate_verified_mapping_events( - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, -) -> Result { - let vsme_shares = sqlx::query_as::<_, VerifiedSubscriberVerifiedMappingEventShare>( - "SELECT - subscriber_id, - SUM(total_reward_points)::BIGINT AS total_reward_points - FROM - subscriber_verified_mapping_event - WHERE received_timestamp >= $1 AND received_timestamp < $2 - GROUP BY - subscriber_id;", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch_all(db) - .await?; - - Ok(vsme_shares) -} - -pub async fn clear( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - timestamp: &DateTime, -) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM subscriber_verified_mapping_event WHERE received_timestamp < $1") - .bind(timestamp) - .execute(&mut *tx) - .await?; - Ok(()) -} From 2d53f9ee14952c8a2a31a9ebf9cae1ad53107e06 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 31 Mar 2025 08:15:10 -0400 Subject: [PATCH 06/13] remove unused tests --- .../tests/integrations/common/mod.rs | 57 +--- mobile_verifier/tests/integrations/main.rs | 1 - .../subscriber_verified_mapping_event.rs | 245 ------------------ 3 files changed, 5 insertions(+), 298 deletions(-) delete mode 100644 mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index d4fa4bc7a..14a05e0e4 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -6,23 +6,16 @@ use file_store::{ use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; use helium_lib::token::Token; -use helium_proto::services::{ - mobile_config::NetworkKeyRole, - poc_mobile::{ - mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, - MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, PromotionReward, - RadioReward, RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, - UnallocatedReward, - }, +use helium_proto::services::poc_mobile::{ + mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, MobileRewardShare, + OracleBoostingHexAssignment, OracleBoostingReportV1, PromotionReward, RadioReward, + RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, }; use hex_assignments::{Assignment, HexAssignment, HexBoostData}; use mobile_config::{ boosted_hex_info::{BoostedHexInfo, BoostedHexInfoStream}, client::sub_dao_client::SubDaoEpochRewardInfoResolver, - client::{ - authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, - hex_boosting_client::HexBoostingInfoResolver, ClientError, - }, + client::{hex_boosting_client::HexBoostingInfoResolver, ClientError}, sub_dao_epoch_reward_info::EpochRewardInfo, }; use mobile_verifier::{ @@ -351,46 +344,6 @@ pub async fn set_unassigned_oracle_boosting_assignments( Ok(output) } -#[derive(Clone)] -pub struct MockAuthorizationClient {} - -impl MockAuthorizationClient { - pub fn new() -> MockAuthorizationClient { - Self {} - } -} - -#[async_trait] -impl AuthorizationVerifier for MockAuthorizationClient { - type Error = ClientError; - - async fn verify_authorized_key( - &self, - _pubkey: &PublicKeyBinary, - _role: NetworkKeyRole, - ) -> Result { - Ok(true) - } -} - -#[derive(Clone)] -pub struct MockEntityClient {} - -impl MockEntityClient { - pub fn new() -> MockEntityClient { - Self {} - } -} - -#[async_trait] -impl EntityVerifier for MockEntityClient { - type Error = ClientError; - - async fn verify_rewardable_entity(&self, _entity_id: &[u8]) -> Result { - Ok(true) - } -} - #[derive(Debug, Copy, Clone)] pub struct GatewayClientAllOwnersValid; diff --git a/mobile_verifier/tests/integrations/main.rs b/mobile_verifier/tests/integrations/main.rs index 32824c99c..c6493c9e6 100644 --- a/mobile_verifier/tests/integrations/main.rs +++ b/mobile_verifier/tests/integrations/main.rs @@ -12,4 +12,3 @@ mod rewarder_poc_dc; mod rewarder_sp_rewards; mod seniority; mod speedtests; -mod subscriber_verified_mapping_event; diff --git a/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs b/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs deleted file mode 100644 index 1b9b77516..000000000 --- a/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs +++ /dev/null @@ -1,245 +0,0 @@ -use chrono::{DateTime, Duration, Utc}; -use file_store::{ - file_info_poller::FileInfoStream, file_sink::FileSinkClient, - subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, - subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - FileInfo, -}; -use helium_crypto::{KeyTag, Keypair, PublicKeyBinary}; -use mobile_verifier::subscriber_verified_mapping_event::{ - aggregate_verified_mapping_events, SubscriberVerifiedMappingEventDaemon, - VerifiedSubscriberVerifiedMappingEventShare, VerifiedSubscriberVerifiedMappingEventShares, -}; -use rand::rngs::OsRng; -use sqlx::{PgPool, Pool, Postgres, Row}; -use std::{collections::HashMap, ops::Range}; -use tokio::time::timeout; - -use crate::common::{MockAuthorizationClient, MockEntityClient}; - -#[sqlx::test] -async fn main_test(pool: PgPool) -> anyhow::Result<()> { - let (reports_tx, reports_rx) = tokio::sync::mpsc::channel(10); - let (sink_tx, mut sink_rx) = tokio::sync::mpsc::channel(10); - let (trigger, listener) = triggered::trigger(); - let task_pool = pool.clone(); - - tokio::spawn(async move { - let deamon = SubscriberVerifiedMappingEventDaemon::new( - task_pool, - MockAuthorizationClient::new(), - MockEntityClient::new(), - reports_rx, - FileSinkClient::new(sink_tx, "metric"), - ); - - deamon.run(listener).await.expect("failed to complete task"); - }); - - // Sending reports as if they are coming from ingestor - let (fis, mut reports, public_key_binary) = file_info_stream(); - reports_tx.send(fis).await?; - - // Testing that each report we sent made it into DB - let mut retry = 0; - const MAX_RETRIES: u32 = 10; - const RETRY_WAIT: std::time::Duration = std::time::Duration::from_secs(1); - while retry <= MAX_RETRIES { - let mut saved_vsmes = select_events(&pool).await?; - - if reports.len() == saved_vsmes.len() { - let now = Utc::now(); - - for vsme in &mut saved_vsmes { - // We have to do this because we do not store carrier_mapping_key in DB - vsme.carrier_mapping_key = public_key_binary.clone(); - // We also update timestamp because github action return a different timestamp - // just few nanoseconds later - vsme.timestamp = now; - println!("vsme {:?}", vsme); - } - - assert!(reports.iter_mut().all(|r| { - r.report.timestamp = now; - println!("report {:?}", r.report); - saved_vsmes.contains(&r.report) - })); - break; - } else { - tracing::debug!("wrong saved_vsmes.len() {}", saved_vsmes.len()); - retry += 1; - tokio::time::sleep(RETRY_WAIT).await; - } - } - - assert!( - retry <= MAX_RETRIES, - "Exceeded maximum retries: {}", - MAX_RETRIES - ); - - // Testing that each report we sent made it on verified report FileSink - for expected_report in reports.clone() { - match timeout(std::time::Duration::from_secs(2), sink_rx.recv()).await { - Ok(Some(msg)) => match msg { - file_store::file_sink::Message::Commit(_) => panic!("got Commit"), - file_store::file_sink::Message::Rollback(_) => panic!("got Rollback"), - file_store::file_sink::Message::Data(_, proto_verified_report) => { - let rcv_report: SubscriberVerifiedMappingEventIngestReport = - proto_verified_report.report.unwrap().try_into()?; - - assert!(timestamp_match( - expected_report.received_timestamp, - rcv_report.received_timestamp - )); - assert_eq!( - expected_report.report.subscriber_id, - rcv_report.report.subscriber_id - ); - assert_eq!( - expected_report.report.total_reward_points, - rcv_report.report.total_reward_points - ); - assert_eq!( - expected_report.report.carrier_mapping_key, - rcv_report.report.carrier_mapping_key - ); - - assert!(timestamp_match( - expected_report.received_timestamp, - rcv_report.report.timestamp - )); - } - }, - Ok(None) => panic!("got none"), - Err(reason) => panic!("got error {reason}"), - } - } - - // Testing aggregate_verified_mapping_events now - let reward_period = Range { - start: Utc::now() - Duration::days(1), - end: Utc::now(), - }; - let mut shares_from_reports = reports_to_shares(reports.clone()); - shares_from_reports.sort_by(|a, b| a.subscriber_id.cmp(&b.subscriber_id)); - - let mut shares = aggregate_verified_mapping_events(&pool, &reward_period).await?; - shares.sort_by(|a, b| a.subscriber_id.cmp(&b.subscriber_id)); - - assert_eq!(shares_from_reports, shares); - - trigger.trigger(); - - Ok(()) -} - -fn timestamp_match(dt1: DateTime, dt2: DateTime) -> bool { - let difference = dt1.signed_duration_since(dt2); - difference.num_seconds().abs() < 1 -} - -fn file_info_stream() -> ( - FileInfoStream, - Vec, - PublicKeyBinary, -) { - let file_info = FileInfo { - key: "test_file_info".to_string(), - prefix: "verified_mapping_event".to_string(), - timestamp: Utc::now(), - size: 0, - }; - - let key_pair = generate_keypair(); - let public_key_binary: PublicKeyBinary = key_pair.public_key().to_owned().into(); - - let reports = vec![ - SubscriberVerifiedMappingEventIngestReport { - received_timestamp: Utc::now(), - report: SubscriberVerifiedMappingEvent { - subscriber_id: vec![0], - total_reward_points: 100, - timestamp: Utc::now(), - carrier_mapping_key: public_key_binary.clone(), - }, - }, - SubscriberVerifiedMappingEventIngestReport { - received_timestamp: Utc::now() - Duration::seconds(10), - report: SubscriberVerifiedMappingEvent { - subscriber_id: vec![1], - total_reward_points: 101, - timestamp: Utc::now() - Duration::seconds(10), - carrier_mapping_key: public_key_binary.clone(), - }, - }, - SubscriberVerifiedMappingEventIngestReport { - received_timestamp: Utc::now(), - report: SubscriberVerifiedMappingEvent { - subscriber_id: vec![1], - total_reward_points: 99, - timestamp: Utc::now(), - carrier_mapping_key: public_key_binary.clone(), - }, - }, - ]; - ( - FileInfoStream::new("default".to_string(), file_info, reports.clone()), - reports, - public_key_binary, - ) -} - -fn reports_to_shares( - reports: Vec, -) -> VerifiedSubscriberVerifiedMappingEventShares { - let mut reward_map: HashMap, i64> = HashMap::new(); - - for report in reports { - let event = report.report; - let entry = reward_map.entry(event.subscriber_id).or_insert(0); - *entry += event.total_reward_points as i64; - } - - reward_map - .into_iter() - .map( - |(subscriber_id, total_reward_points)| VerifiedSubscriberVerifiedMappingEventShare { - subscriber_id, - total_reward_points, - }, - ) - .collect() -} - -async fn select_events( - pool: &Pool, -) -> anyhow::Result> { - let rows = sqlx::query( - r#" - SELECT - subscriber_id, - total_reward_points, - received_timestamp - FROM subscriber_verified_mapping_event - "#, - ) - .fetch_all(pool) - .await?; - - let events = rows - .into_iter() - .map(|row| SubscriberVerifiedMappingEvent { - subscriber_id: row.get::, _>("subscriber_id"), - total_reward_points: row.get::("total_reward_points") as u64, - timestamp: row.get::, _>("received_timestamp"), - carrier_mapping_key: vec![].into(), - }) - .collect::>(); - - Ok(events) -} - -fn generate_keypair() -> Keypair { - Keypair::generate(KeyTag::default(), &mut OsRng) -} From c68a9fbe3e504f9ff5af51f7a716c498de06a443 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 31 Mar 2025 09:33:51 -0400 Subject: [PATCH 07/13] Change to spawn tokio task and fix lifetime issues --- .../src/subscriber_mapping_activity.rs | 138 ++++++++++-------- 1 file changed, 77 insertions(+), 61 deletions(-) diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs index 0d3076be0..5e066ded3 100644 --- a/mobile_verifier/src/subscriber_mapping_activity.rs +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -12,7 +12,7 @@ use file_store::{ }, FileStore, FileType, }; -use futures::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::{ mobile_config::NetworkKeyRole, @@ -42,8 +42,8 @@ pub struct SubscriberMappingActivityDaemon { impl SubscriberMappingActivityDaemon where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, + AV: AuthorizationVerifier + Clone, + EV: EntityVerifier + Clone + Send + Sync + 'static, { pub fn new( pool: Pool, @@ -131,11 +131,18 @@ where let activity_stream = stream .map(SubscriberMappingActivity::try_from) - .and_then(|sma| async move { - let status = self.verify_activity(&sma).await?; - Ok((sma, status)) + .and_then(|sma| { + let av = self.authorization_verifier.clone(); + let ev = self.entity_verifier.clone(); + async move { + let status = verify_activity(av, ev, &sma).await?; + Ok((sma, status)) + } + }) + .and_then(|(sma, status)| { + let sink = self.verified_sink.clone(); + async move { write_verified_report(sink, sma, status).await } }) - .and_then(|(sma, status)| self.write_verified_report(sma, status)) .try_filter_map(|(sma, status)| is_valid(sma, status)); db::save(&mut transaction, activity_stream).await?; @@ -143,57 +150,67 @@ where transaction.commit().await?; Ok(()) } +} - async fn write_verified_report( - &self, - mut activity: SubscriberMappingActivity, - status: SubscriberReportVerificationStatus, - ) -> anyhow::Result<( - SubscriberMappingActivity, - SubscriberReportVerificationStatus, - )> { - let verified_proto = VerifiedSubscriberMappingActivityReportV1 { - report: activity.take_original(), - status: status as i32, - timestamp: Utc::now().encode_timestamp_millis(), - }; +async fn write_verified_report( + sink: FileSinkClient, + mut activity: SubscriberMappingActivity, + status: SubscriberReportVerificationStatus, +) -> anyhow::Result<( + SubscriberMappingActivity, + SubscriberReportVerificationStatus, +)> { + let verified_proto = VerifiedSubscriberMappingActivityReportV1 { + report: activity.take_original(), + status: status as i32, + timestamp: Utc::now().encode_timestamp_millis(), + }; - self.verified_sink - .write(verified_proto, &[("status", status.as_str_name())]) - .await?; + sink.write(verified_proto, &[("status", status.as_str_name())]) + .await?; - Ok((activity, status)) - } + Ok((activity, status)) +} - async fn verify_activity( - &self, - activity: &SubscriberMappingActivity, - ) -> anyhow::Result { - if !self - .verify_known_carrier_key(&activity.carrier_pub_key) - .await? - { - return Ok(SubscriberReportVerificationStatus::InvalidCarrierKey); - }; - if !self.verify_subscriber_id(&activity.subscriber_id).await? { - return Ok(SubscriberReportVerificationStatus::InvalidSubscriberId); - }; - Ok(SubscriberReportVerificationStatus::Valid) - } +async fn verify_activity( + authorization_verifier: AV, + entity_verifier: EV, + activity: &SubscriberMappingActivity, +) -> anyhow::Result +where + AV: AuthorizationVerifier, + EV: EntityVerifier, +{ + if !verify_known_carrier_key(authorization_verifier, &activity.carrier_pub_key).await? { + return Ok(SubscriberReportVerificationStatus::InvalidCarrierKey); + }; + if !verify_subscriber_id(entity_verifier, &activity.subscriber_id).await? { + return Ok(SubscriberReportVerificationStatus::InvalidSubscriberId); + }; + Ok(SubscriberReportVerificationStatus::Valid) +} - async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> anyhow::Result { - self.authorization_verifier - .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) - .await - .map_err(anyhow::Error::from) - } +async fn verify_known_carrier_key( + authorization_verifier: AV, + public_key: &PublicKeyBinary, +) -> anyhow::Result +where + AV: AuthorizationVerifier, +{ + authorization_verifier + .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) + .await + .map_err(anyhow::Error::from) +} - async fn verify_subscriber_id(&self, subscriber_id: &[u8]) -> anyhow::Result { - self.entity_verifier - .verify_rewardable_entity(subscriber_id) - .await - .map_err(anyhow::Error::from) - } +async fn verify_subscriber_id(entity_verifier: EV, subscriber_id: &[u8]) -> anyhow::Result +where + EV: EntityVerifier, +{ + entity_verifier + .verify_rewardable_entity(subscriber_id) + .await + .map_err(anyhow::Error::from) } async fn is_valid( @@ -209,20 +226,19 @@ async fn is_valid( impl ManagedTask for SubscriberMappingActivityDaemon where - AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, + AV: AuthorizationVerifier + Clone, + EV: EntityVerifier + Clone + Send + Sync + 'static, { fn start_task( self: Box, shutdown: triggered::Listener, ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { - Box::pin(self.run(shutdown)) - // let handle = tokio::spawn(async move { self.run(shutdown).await }); - // Box::pin( - // handle - // .map_err(anyhow::Error::from) - // .and_then(|result| async move { result }), - // ) + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result }), + ) } } From dc1ee7389c61a3f53846a04182652724d0d59655 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 31 Mar 2025 18:00:39 -0400 Subject: [PATCH 08/13] remove commented out code --- mobile_verifier/src/rewarder.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index c8925a0c0..d04a10507 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -336,7 +336,6 @@ where subscriber_mapping_activity::db::clear(&mut transaction, reward_info.epoch_period.start) .await?; unique_connections::db::clear(&mut transaction, &reward_info.epoch_period.start).await?; - // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; save_next_reward_epoch(&mut transaction, reward_info.epoch_day + 1).await?; From 4ee98a229f223bf0471f895ebf72685252665a6f Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 1 Apr 2025 14:03:14 -0400 Subject: [PATCH 09/13] refactoring of subscriber mapping activity processing --- .../src/subscriber_mapping_activity.rs | 84 ++++++++----------- .../tests/integrations/rewarder_mappers.rs | 4 - 2 files changed, 37 insertions(+), 51 deletions(-) diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs index 5e066ded3..20dcb8a3b 100644 --- a/mobile_verifier/src/subscriber_mapping_activity.rs +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::{sync::Arc, time::Instant}; use chrono::{DateTime, Utc}; use file_store::{ @@ -34,16 +34,16 @@ pub mod db; pub struct SubscriberMappingActivityDaemon { pool: Pool, - authorization_verifier: AV, - entity_verifier: EV, + authorization_verifier: Arc, + entity_verifier: Arc, stream_receiver: Receiver>, verified_sink: FileSinkClient, } impl SubscriberMappingActivityDaemon where - AV: AuthorizationVerifier + Clone, - EV: EntityVerifier + Clone + Send + Sync + 'static, + AV: AuthorizationVerifier, + EV: EntityVerifier + Send + Sync + 'static, { pub fn new( pool: Pool, @@ -54,8 +54,8 @@ where ) -> Self { Self { pool, - authorization_verifier, - entity_verifier, + authorization_verifier: Arc::new(authorization_verifier), + entity_verifier: Arc::new(entity_verifier), stream_receiver, verified_sink, } @@ -130,20 +130,28 @@ where let stream = file_info_stream.into_stream(&mut transaction).await?; let activity_stream = stream - .map(SubscriberMappingActivity::try_from) - .and_then(|sma| { + .map(|proto| { + let activity = SubscriberMappingActivity::try_from(proto.clone())?; + Ok((activity, proto)) + }) + .and_then(|(activity, proto)| { let av = self.authorization_verifier.clone(); let ev = self.entity_verifier.clone(); async move { - let status = verify_activity(av, ev, &sma).await?; - Ok((sma, status)) + let status = verify_activity(av, ev, &activity).await?; + Ok((activity, proto, status)) } }) - .and_then(|(sma, status)| { + .and_then(|(activity, proto, status)| { let sink = self.verified_sink.clone(); - async move { write_verified_report(sink, sma, status).await } + async move { + write_verified_report(sink, proto, status).await?; + Ok((activity, status)) + } }) - .try_filter_map(|(sma, status)| is_valid(sma, status)); + .try_filter_map(|(activity, status)| async move { + Ok(matches!(status, SubscriberReportVerificationStatus::Valid).then_some(activity)) + }); db::save(&mut transaction, activity_stream).await?; self.verified_sink.commit().await?; @@ -154,14 +162,11 @@ where async fn write_verified_report( sink: FileSinkClient, - mut activity: SubscriberMappingActivity, + proto: SubscriberMappingActivityIngestReportV1, status: SubscriberReportVerificationStatus, -) -> anyhow::Result<( - SubscriberMappingActivity, - SubscriberReportVerificationStatus, -)> { +) -> anyhow::Result<()> { let verified_proto = VerifiedSubscriberMappingActivityReportV1 { - report: activity.take_original(), + report: Some(proto), status: status as i32, timestamp: Utc::now().encode_timestamp_millis(), }; @@ -169,12 +174,12 @@ async fn write_verified_report( sink.write(verified_proto, &[("status", status.as_str_name())]) .await?; - Ok((activity, status)) + Ok(()) } async fn verify_activity( - authorization_verifier: AV, - entity_verifier: EV, + authorization_verifier: impl AsRef, + entity_verifier: impl AsRef, activity: &SubscriberMappingActivity, ) -> anyhow::Result where @@ -191,43 +196,37 @@ where } async fn verify_known_carrier_key( - authorization_verifier: AV, + authorization_verifier: impl AsRef, public_key: &PublicKeyBinary, ) -> anyhow::Result where AV: AuthorizationVerifier, { authorization_verifier + .as_ref() .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) .await .map_err(anyhow::Error::from) } -async fn verify_subscriber_id(entity_verifier: EV, subscriber_id: &[u8]) -> anyhow::Result +async fn verify_subscriber_id( + entity_verifier: impl AsRef, + subscriber_id: &[u8], +) -> anyhow::Result where EV: EntityVerifier, { entity_verifier + .as_ref() .verify_rewardable_entity(subscriber_id) .await .map_err(anyhow::Error::from) } -async fn is_valid( - activity: SubscriberMappingActivity, - status: SubscriberReportVerificationStatus, -) -> anyhow::Result> { - if status == SubscriberReportVerificationStatus::Valid { - Ok(Some(activity)) - } else { - Ok(None) - } -} - impl ManagedTask for SubscriberMappingActivityDaemon where - AV: AuthorizationVerifier + Clone, - EV: EntityVerifier + Clone + Send + Sync + 'static, + AV: AuthorizationVerifier, + EV: EntityVerifier + Send + Sync + 'static, { fn start_task( self: Box, @@ -248,20 +247,12 @@ pub struct SubscriberMappingActivity { pub verification_reward_shares: u64, pub received_timestamp: DateTime, pub carrier_pub_key: PublicKeyBinary, - pub original: Option, -} - -impl SubscriberMappingActivity { - fn take_original(&mut self) -> Option { - self.original.take() - } } impl TryFrom for SubscriberMappingActivity { type Error = anyhow::Error; fn try_from(value: SubscriberMappingActivityIngestReportV1) -> Result { - let original = value.clone(); let report = value .report .ok_or_else(|| anyhow::anyhow!("SubscriberMappingActivityReqV1 not found"))?; @@ -272,7 +263,6 @@ impl TryFrom for SubscriberMappingActiv verification_reward_shares: report.verification_reward_shares, received_timestamp: value.received_timestamp.to_timestamp_millis()?, carrier_pub_key: PublicKeyBinary::from(report.carrier_pub_key), - original: Some(original), }) } } diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index 437bb4087..9f047229a 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -136,7 +136,6 @@ async fn seed_mapping_data( discovery_reward_shares: 30, verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - original: None, }, SubscriberMappingActivity { received_timestamp: ts - ChronoDuration::hours(2), @@ -144,7 +143,6 @@ async fn seed_mapping_data( discovery_reward_shares: 30, verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - original: None, }, SubscriberMappingActivity { received_timestamp: ts - ChronoDuration::hours(1), @@ -152,7 +150,6 @@ async fn seed_mapping_data( discovery_reward_shares: 30, verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - original: None, }, SubscriberMappingActivity { received_timestamp: ts - ChronoDuration::hours(1), @@ -160,7 +157,6 @@ async fn seed_mapping_data( discovery_reward_shares: 30, verification_reward_shares: 0, carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(), - original: None, }, ]; From d326a72ab6154f6de4c245573d73d64ec8494c8a Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 2 Apr 2025 07:10:52 -0400 Subject: [PATCH 10/13] move send/sync to trait definition --- mobile_config/src/client/entity_client.rs | 2 +- mobile_verifier/src/subscriber_mapping_activity.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mobile_config/src/client/entity_client.rs b/mobile_config/src/client/entity_client.rs index 129b1d6af..67626e7d0 100644 --- a/mobile_config/src/client/entity_client.rs +++ b/mobile_config/src/client/entity_client.rs @@ -10,7 +10,7 @@ use retainer::Cache; use std::{sync::Arc, time::Duration}; #[async_trait] -pub trait EntityVerifier { +pub trait EntityVerifier: Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result; diff --git a/mobile_verifier/src/subscriber_mapping_activity.rs b/mobile_verifier/src/subscriber_mapping_activity.rs index 20dcb8a3b..5d919e57f 100644 --- a/mobile_verifier/src/subscriber_mapping_activity.rs +++ b/mobile_verifier/src/subscriber_mapping_activity.rs @@ -43,7 +43,7 @@ pub struct SubscriberMappingActivityDaemon { impl SubscriberMappingActivityDaemon where AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, + EV: EntityVerifier, { pub fn new( pool: Pool, @@ -226,7 +226,7 @@ where impl ManagedTask for SubscriberMappingActivityDaemon where AV: AuthorizationVerifier, - EV: EntityVerifier + Send + Sync + 'static, + EV: EntityVerifier, { fn start_task( self: Box, From a223808432af1c6fb6cd3bac728f698eb27fcf84 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 2 Apr 2025 07:34:38 -0400 Subject: [PATCH 11/13] remove associated type for AuthorizationVerifier and EntityVerifier --- ingest/tests/common/mod.rs | 8 ++------ mobile_config/src/client/authorization_client.rs | 6 +----- mobile_config/src/client/entity_client.rs | 6 +----- mobile_verifier/src/sp_boosted_rewards_bans.rs | 9 ++------- 4 files changed, 6 insertions(+), 23 deletions(-) diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 210bf6399..cd6aedc0f 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -19,6 +19,7 @@ use helium_proto::services::{ }; use ingest::server_mobile::GrpcServer; use mobile_config::client::authorization_client::AuthorizationVerifier; +use mobile_config::client::ClientError; use prost::Message; use rand::rngs::OsRng; use std::{net::SocketAddr, sync::Arc, time::Duration}; @@ -40,18 +41,13 @@ impl MockAuthorizationClient { } } -#[derive(thiserror::Error, Debug)] -pub enum MockError {} - #[async_trait] impl AuthorizationVerifier for MockAuthorizationClient { - type Error = MockError; - async fn verify_authorized_key( &self, _pubkey: &PublicKeyBinary, _role: NetworkKeyRole, - ) -> Result { + ) -> Result { Ok(true) } } diff --git a/mobile_config/src/client/authorization_client.rs b/mobile_config/src/client/authorization_client.rs index cfc1472ee..0193af382 100644 --- a/mobile_config/src/client/authorization_client.rs +++ b/mobile_config/src/client/authorization_client.rs @@ -11,13 +11,11 @@ use std::{sync::Arc, time::Duration}; #[async_trait] pub trait AuthorizationVerifier: Send + Sync + 'static { - type Error: std::error::Error + Send + Sync + 'static; - async fn verify_authorized_key( &self, pubkey: &PublicKeyBinary, role: mobile_config::NetworkKeyRole, - ) -> Result; + ) -> Result; } #[derive(Clone)] @@ -51,8 +49,6 @@ impl AuthorizationClient { #[async_trait] impl AuthorizationVerifier for AuthorizationClient { - type Error = ClientError; - async fn verify_authorized_key( &self, pubkey: &PublicKeyBinary, diff --git a/mobile_config/src/client/entity_client.rs b/mobile_config/src/client/entity_client.rs index 67626e7d0..74da8f4f0 100644 --- a/mobile_config/src/client/entity_client.rs +++ b/mobile_config/src/client/entity_client.rs @@ -11,9 +11,7 @@ use std::{sync::Arc, time::Duration}; #[async_trait] pub trait EntityVerifier: Send + Sync + 'static { - type Error: std::error::Error + Send + Sync + 'static; - - async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result; + async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result; } #[derive(Clone)] @@ -27,8 +25,6 @@ pub struct EntityClient { #[async_trait] impl EntityVerifier for EntityClient { - type Error = ClientError; - async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result { let entity_id = entity_id.to_vec(); if let Some(entity_found) = self.cache.get(&entity_id).await { diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index a95953bfb..5ce6a02e2 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -123,7 +123,6 @@ pub struct ServiceProviderBoostedRewardsBanIngestor { impl ManagedTask for ServiceProviderBoostedRewardsBanIngestor where AV: AuthorizationVerifier, - AV::Error: std::error::Error + Send + Sync + 'static, { fn start_task( self: Box, @@ -141,7 +140,6 @@ where impl ServiceProviderBoostedRewardsBanIngestor where AV: AuthorizationVerifier, - AV::Error: std::error::Error + Send + Sync + 'static, { pub async fn create_managed_task( pool: PgPool, @@ -420,6 +418,7 @@ mod tests { use helium_proto::services::poc_mobile::{ SeniorityUpdate as SeniorityUpdateProto, ServiceProviderBoostedRewardsBannedRadioReqV1, }; + use mobile_config::client::ClientError; use rand::rngs::OsRng; use tokio::sync::mpsc; @@ -427,19 +426,15 @@ mod tests { use super::*; - #[derive(thiserror::Error, Debug)] - enum TestError {} struct AllVerified; #[async_trait::async_trait] impl AuthorizationVerifier for AllVerified { - type Error = TestError; - async fn verify_authorized_key( &self, _pubkey: &PublicKeyBinary, _role: helium_proto::services::mobile_config::NetworkKeyRole, - ) -> Result { + ) -> Result { Ok(true) } } From 35ee0e0d2a90f5eaaec9c2020cc83c1c1449d7c2 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 2 Apr 2025 13:26:47 -0400 Subject: [PATCH 12/13] update proto dep back to master --- Cargo.lock | 8 ++++---- Cargo.toml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e88c0221b..0e9b62128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,8 +1617,8 @@ dependencies = [ "byteorder", "helium-proto", "prost", - "rand 0.8.5", - "rand_chacha 0.3.0", + "rand 0.7.3", + "rand_chacha 0.2.2", "rust_decimal", "serde", "sha2 0.9.9", @@ -3816,7 +3816,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://www.github.com/helium/proto.git?branch=jg%2Fdisco-shares-v2#f8a26f6727ec310f2f03a0e208267d4775e9203d" +source = "git+https://github.com/helium/proto?branch=master#5686762b56fb03308cbe13d3f4bec9497982ad59" dependencies = [ "bytes", "prost", @@ -6064,7 +6064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.0", "itertools", "log", "multimap", diff --git a/Cargo.toml b/Cargo.toml index 61a855f59..61e0729d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,6 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65 # helium-proto = { path = "../../proto" } # beacon = { path = "../../proto" } -[patch.'https://github.com/helium/proto'] -helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "jg/disco-shares-v2" } +# [patch.'https://github.com/helium/proto'] +# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "jg/disco-shares-v2" } From 8cc91156b26e6a095898bd8ad8bae8b76b36b3e6 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 2 Apr 2025 13:36:57 -0400 Subject: [PATCH 13/13] update beacon --- Cargo.lock | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e9b62128..7a1bfb354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,17 +1611,17 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#8f84e2a1f4229354520bf380dd3e96590ae31c06" +source = "git+https://github.com/helium/proto?branch=master#5686762b56fb03308cbe13d3f4bec9497982ad59" dependencies = [ "base64 0.21.7", "byteorder", "helium-proto", "prost", - "rand 0.7.3", - "rand_chacha 0.2.2", + "rand 0.8.5", + "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -3749,7 +3749,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.1.16", + "getrandom 0.2.10", "k256", "lazy_static", "multihash", @@ -6064,7 +6064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -9320,7 +9320,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] @@ -9990,7 +9990,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf",