Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,7 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65
# [patch.'https://github.com/helium/proto']
# helium-proto = { path = "../../proto" }
# beacon = { path = "../../proto" }

# [patch.'https://github.com/helium/proto']
# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "jg/disco-shares-v2" }

18 changes: 18 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward";
pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund";
pub const UNIQUE_CONNECTIONS_REPORT: &str = "unique_connections_report";
pub const VERIFIED_UNIQUE_CONNECTIONS_REPORT: &str = "verified_unique_connections_report";
pub const SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT: &str =
"subscriber_mapping_activity_ingest_report";
pub const VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT: &str =
"verified_subscriber_mapping_activity_report";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -241,6 +245,8 @@ pub enum FileType {
RadioUsageStatsReq,
UniqueConnectionsReport,
VerifiedUniqueConnectionsReport,
SubscriberMappingActivityIngestReport,
VerifiedSubscriberMappingActivityReport,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -328,6 +334,12 @@ impl FileType {
Self::RadioUsageStatsReq => RADIO_USAGE_STATS_REQ,
Self::UniqueConnectionsReport => UNIQUE_CONNECTIONS_REPORT,
Self::VerifiedUniqueConnectionsReport => VERIFIED_UNIQUE_CONNECTIONS_REPORT,
Self::SubscriberMappingActivityIngestReport => {
SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT
}
Self::VerifiedSubscriberMappingActivityReport => {
VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT
}
}
}
}
Expand Down Expand Up @@ -411,6 +423,12 @@ impl FromStr for FileType {
RADIO_USAGE_STATS_REQ => Self::RadioUsageStatsReq,
UNIQUE_CONNECTIONS_REPORT => Self::UniqueConnectionsReport,
VERIFIED_UNIQUE_CONNECTIONS_REPORT => Self::VerifiedUniqueConnectionsReport,
SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT => {
Self::SubscriberMappingActivityIngestReport
}
VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT => {
Self::VerifiedSubscriberMappingActivityReport
}
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
11 changes: 11 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,14 @@ impl_file_sink!(
FileType::RewardManifest.to_str(),
"reward_manifest"
);
impl_file_sink!(
poc_mobile::SubscriberMappingActivityIngestReportV1,
FileType::SubscriberMappingActivityIngestReport.to_str(),
"subscriber_mapping_activity_ingest_report"
);

impl_file_sink!(
poc_mobile::VerifiedSubscriberMappingActivityReportV1,
FileType::VerifiedSubscriberMappingActivityReport.to_str(),
"verified_subscriber_mapping_activity_report"
);
1 change: 1 addition & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::HexUsageStatsReqV1, signature);
impl_msg_verify!(poc_mobile::RadioUsageStatsReqV1, signature);
impl_msg_verify!(poc_mobile::UniqueConnectionsReqV1, signature);
impl_msg_verify!(poc_mobile::SubscriberMappingActivityReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
45 changes: 44 additions & 1 deletion ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use helium_proto::services::poc_mobile::{
RadioUsageStatsResV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1,
SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1,
SubscriberLocationReqV1, SubscriberLocationRespV1,
SubscriberLocationReqV1, SubscriberLocationRespV1, SubscriberMappingActivityIngestReportV1,
SubscriberMappingActivityReqV1, SubscriberMappingActivityResV1,
SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1,
SubscriberVerifiedMappingEventResV1, UniqueConnectionsIngestReportV1,
WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, WifiHeartbeatRespV1,
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct GrpcServer<AV> {
hex_usage_stats_event_sink: FileSinkClient<HexUsageStatsIngestReportV1>,
radio_usage_stats_event_sink: FileSinkClient<RadioUsageStatsIngestReportV1>,
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -103,6 +105,7 @@ where
hex_usage_stats_event_sink: FileSinkClient<HexUsageStatsIngestReportV1>,
radio_usage_stats_event_sink: FileSinkClient<RadioUsageStatsIngestReportV1>,
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand All @@ -121,6 +124,7 @@ where
hex_usage_stats_event_sink,
radio_usage_stats_event_sink,
unique_connections_sink,
subscriber_mapping_activity_sink,
required_network,
address,
api_token,
Expand Down Expand Up @@ -478,6 +482,33 @@ where
Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id }))
}

async fn submit_subscriber_mapping_activity(
&self,
request: Request<SubscriberMappingActivityReqV1>,
) -> GrpcResult<SubscriberMappingActivityResV1> {
let timestamp = Utc::now().timestamp_millis() as u64;
let event = request.into_inner();

custom_tracing::record_b58("subscriber_id", &event.subscriber_id);
custom_tracing::record_b58("pub_key", &event.carrier_pub_key);

let report = self
.verify_public_key(&event.carrier_pub_key)
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| SubscriberMappingActivityIngestReportV1 {
received_timestamp: timestamp,
report: Some(event),
})?;

_ = self
.subscriber_mapping_activity_sink
.write(report, [])
.await;

Ok(Response::new(SubscriberMappingActivityResV1 { timestamp }))
}

async fn submit_hex_usage_stats_report(
&self,
request: Request<HexUsageStatsReqV1>,
Expand Down Expand Up @@ -690,6 +721,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
)
.await?;

let (subscriber_mapping_activity_sink, subscriber_mapping_activity_server) =
SubscriberMappingActivityIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let Some(api_token) = settings
.token
.as_ref()
Expand All @@ -715,6 +756,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
hex_usage_stats_event_sink,
radio_usage_stats_event_sink,
unique_connections_sink,
subscriber_mapping_activity_sink,
settings.network,
settings.listen_addr,
api_token,
Expand All @@ -741,6 +783,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(hex_usage_stats_event_server)
.add_task(radio_usage_stats_event_server)
.add_task(unique_connections_server)
.add_task(subscriber_mapping_activity_server)
.add_task(grpc_server)
.build()
.start()
Expand Down
8 changes: 5 additions & 3 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use helium_proto::services::{
};
use ingest::server_mobile::GrpcServer;
use mobile_config::client::authorization_client::AuthorizationVerifier;
use mobile_config::client::ClientError;
use prost::Message;
use rand::rngs::OsRng;
use std::{net::SocketAddr, sync::Arc, time::Duration};
Expand All @@ -42,13 +43,11 @@ impl MockAuthorizationClient {

#[async_trait]
impl AuthorizationVerifier for MockAuthorizationClient {
type Error = anyhow::Error;

async fn verify_authorized_key(
&self,
_pubkey: &PublicKeyBinary,
_role: NetworkKeyRole,
) -> anyhow::Result<bool> {
) -> Result<bool, ClientError> {
Ok(true)
}
}
Expand Down Expand Up @@ -80,6 +79,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let (hex_usage_stat_tx, hex_usage_stat_rx) = tokio::sync::mpsc::channel(10);
let (radio_usage_stat_tx, radio_usage_stat_rx) = tokio::sync::mpsc::channel(10);
let (unique_connections_tx, unique_connections_rx) = tokio::sync::mpsc::channel(10);
let (subscriber_mapping_activity_tx, _subscriber_mapping_activity_rx) =
tokio::sync::mpsc::channel(10);

let auth_client = MockAuthorizationClient::new();

Expand All @@ -97,6 +98,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
FileSinkClient::new(hex_usage_stat_tx, "hex_usage_test_file_sink"),
FileSinkClient::new(radio_usage_stat_tx, "radio_usage_test_file_sink"),
FileSinkClient::new(unique_connections_tx, "noop"),
FileSinkClient::new(subscriber_mapping_activity_tx, "noop"),
Network::MainNet,
socket_addr,
api_token,
Expand Down
6 changes: 1 addition & 5 deletions mobile_config/src/client/authorization_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ use std::{sync::Arc, time::Duration};

#[async_trait]
pub trait AuthorizationVerifier: Send + Sync + 'static {
type Error;

async fn verify_authorized_key(
&self,
pubkey: &PublicKeyBinary,
role: mobile_config::NetworkKeyRole,
) -> Result<bool, Self::Error>;
) -> Result<bool, ClientError>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -51,8 +49,6 @@ impl AuthorizationClient {

#[async_trait]
impl AuthorizationVerifier for AuthorizationClient {
type Error = ClientError;

async fn verify_authorized_key(
&self,
pubkey: &PublicKeyBinary,
Expand Down
8 changes: 2 additions & 6 deletions mobile_config/src/client/entity_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use retainer::Cache;
use std::{sync::Arc, time::Duration};

#[async_trait]
pub trait EntityVerifier {
type Error;

async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, Self::Error>;
pub trait EntityVerifier: Send + Sync + 'static {
async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, ClientError>;
}

#[derive(Clone)]
Expand All @@ -27,8 +25,6 @@ pub struct EntityClient {

#[async_trait]
impl EntityVerifier for EntityClient {
type Error = ClientError;

async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, ClientError> {
let entity_id = entity_id.to_vec();
if let Some(entity_found) = self.cache.get(&entity_id).await {
Expand Down
18 changes: 18 additions & 0 deletions mobile_verifier/migrations/42_subscriber_mapping_activity.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS subscriber_mapping_activity (
subscriber_id BYTEA NOT NULL,
discovery_reward_shares BIGINT NOT NULL,
verification_reward_shares BIGINT NOT NULL,
received_timestamp TIMESTAMPTZ NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (subscriber_id, received_timestamp)
);

INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at)
SELECT subscriber_id, 30, 0, received_timestamp, created_at AS inserted_at
FROM subscriber_loc_verified;

UPDATE subscriber_mapping_activity sma
SET verification_reward_shares = svme.total_reward_points
FROM subscriber_verified_mapping_event svme
WHERE sma.subscriber_id = svme.subscriber_id
AND sma.received_timestamp::date = svme.received_timestamp::date;
16 changes: 2 additions & 14 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::{
rewarder::Rewarder,
sp_boosted_rewards_bans::ServiceProviderBoostedRewardsBanIngestor,
speedtests::SpeedtestDaemon,
subscriber_location::SubscriberLocationIngestor,
subscriber_verified_mapping_event::SubscriberVerifiedMappingEventDaemon,
subscriber_mapping_activity::SubscriberMappingActivityDaemon,
telemetry,
unique_connections::ingestor::UniqueConnectionsIngestor,
Settings,
Expand Down Expand Up @@ -127,7 +126,7 @@ impl Cmd {
.await?,
)
.add_task(
SubscriberVerifiedMappingEventDaemon::create_managed_task(
SubscriberMappingActivityDaemon::create_managed_task(
pool.clone(),
settings,
auth_client.clone(),
Expand Down Expand Up @@ -157,17 +156,6 @@ impl Cmd {
)
.await?,
)
.add_task(
SubscriberLocationIngestor::create_managed_task(
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
auth_client.clone(),
entity_client.clone(),
)
.await?,
)
.add_task(
RadioThresholdIngestor::create_managed_task(
pool.clone(),
Expand Down
3 changes: 1 addition & 2 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ mod settings;
pub mod sp_boosted_rewards_bans;
pub mod speedtests;
pub mod speedtests_average;
pub mod subscriber_location;
pub mod subscriber_verified_mapping_event;
pub mod subscriber_mapping_activity;
pub mod telemetry;
pub mod unique_connections;

Expand Down
Loading