Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
- 5432:5432
localstack:
image: localstack/localstack:latest
environment:
env:
SERVICES: s3
EAGER_SERVICE_LOADING: 1
ports:
Expand Down
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aws_local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::Mutex;
use tonic::transport::Uri;
use uuid::Uuid;

pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566";
pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://localstack:4566";

pub fn gen_bucket_name() -> String {
format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE IF EXISTS subscriber_mapping_activity ADD COLUMN IF NOT EXISTS reward_override_entity_key TEXT;
4 changes: 4 additions & 0 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ impl MapperShares {
subscriber_id: mas.subscriber_id,
discovery_location_amount,
verification_mapping_amount,
reward_override_entity_key: mas
.reward_override_entity_key
.unwrap_or_default(),
})),
},
)
Expand Down Expand Up @@ -831,6 +834,7 @@ mod test {
subscriber_id: n.encode_to_vec(),
discovery_reward_shares: 30,
verification_reward_shares: 30,
reward_override_entity_key: None,
})
}

Expand Down
57 changes: 53 additions & 4 deletions mobile_verifier/src/subscriber_mapping_activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,15 @@ where
if !verify_known_carrier_key(authorization_verifier, &activity.carrier_pub_key).await? {
return Ok(SubscriberReportVerificationStatus::InvalidCarrierKey);
};
if !verify_subscriber_id(entity_verifier, &activity.subscriber_id).await? {
if !verify_entity(&entity_verifier, &activity.subscriber_id).await? {
return Ok(SubscriberReportVerificationStatus::InvalidSubscriberId);
};
if let Some(rek) = &activity.reward_override_entity_key {
// use UTF8(key_serialization) as bytea
if !verify_entity(entity_verifier, &rek.clone().into_bytes()).await? {
return Ok(SubscriberReportVerificationStatus::InvalidRewardOverrideEntityKey);
};
}
Ok(SubscriberReportVerificationStatus::Valid)
}

Expand All @@ -209,16 +215,16 @@ where
.map_err(anyhow::Error::from)
}

async fn verify_subscriber_id<EV>(
async fn verify_entity<EV>(
entity_verifier: impl AsRef<EV>,
subscriber_id: &[u8],
entity_id: &[u8],
) -> anyhow::Result<bool>
where
EV: EntityVerifier,
{
entity_verifier
.as_ref()
.verify_rewardable_entity(subscriber_id)
.verify_rewardable_entity(entity_id)
.await
.map_err(anyhow::Error::from)
}
Expand Down Expand Up @@ -247,6 +253,7 @@ pub struct SubscriberMappingActivity {
pub verification_reward_shares: u64,
pub received_timestamp: DateTime<Utc>,
pub carrier_pub_key: PublicKeyBinary,
pub reward_override_entity_key: Option<String>,
}

impl TryFrom<SubscriberMappingActivityIngestReportV1> for SubscriberMappingActivity {
Expand All @@ -257,12 +264,19 @@ impl TryFrom<SubscriberMappingActivityIngestReportV1> for SubscriberMappingActiv
.report
.ok_or_else(|| anyhow::anyhow!("SubscriberMappingActivityReqV1 not found"))?;

let reward_override_entity_key = if report.reward_override_entity_key.is_empty() {
Copy link

Copilot AI Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider using report.reward_override_entity_key.trim().is_empty() to also handle cases where the string may contain only whitespace.

Suggested change
let reward_override_entity_key = if report.reward_override_entity_key.is_empty() {
let reward_override_entity_key = if report.reward_override_entity_key.trim().is_empty() {

Copilot uses AI. Check for mistakes.
None
} else {
Some(report.reward_override_entity_key)
};

Ok(Self {
subscriber_id: report.subscriber_id,
discovery_reward_shares: report.discovery_reward_shares,
verification_reward_shares: report.verification_reward_shares,
received_timestamp: value.received_timestamp.to_timestamp_millis()?,
carrier_pub_key: PublicKeyBinary::from(report.carrier_pub_key),
reward_override_entity_key,
})
}
}
Expand All @@ -274,4 +288,39 @@ pub struct SubscriberMappingShares {
pub discovery_reward_shares: u64,
#[sqlx(try_from = "i64")]
pub verification_reward_shares: u64,

pub reward_override_entity_key: Option<String>,
}

#[cfg(test)]
mod tests {
use super::SubscriberMappingActivity;
use helium_proto::services::poc_mobile::SubscriberMappingActivityIngestReportV1;

#[test]
fn try_from_subscriber_mapping_activity_check_entity_key() {
// Make sure reward_override_entity_key empty string transforms to None
let smair = SubscriberMappingActivityIngestReportV1 {
received_timestamp: 1,
report: Some({
helium_proto::services::poc_mobile::SubscriberMappingActivityReqV1 {
subscriber_id: vec![10],
discovery_reward_shares: 2,
verification_reward_shares: 3,
timestamp: 4,
carrier_pub_key: vec![11],
signature: vec![12],
reward_override_entity_key: "".to_string(),
}
}),
};
let mut smair2 = smair.clone();
smair2.report.as_mut().unwrap().reward_override_entity_key = "key".to_string();

let res = SubscriberMappingActivity::try_from(smair).unwrap();
assert!(res.reward_override_entity_key.is_none());

let res = SubscriberMappingActivity::try_from(smair2).unwrap();
assert_eq!(res.reward_override_entity_key, Some("key".to_string()));
}
}
10 changes: 6 additions & 4 deletions mobile_verifier/src/subscriber_mapping_activity/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ pub async fn save(
transaction: &mut Transaction<'_, Postgres>,
ingest_reports: impl Stream<Item = anyhow::Result<SubscriberMappingActivity>>,
) -> anyhow::Result<()> {
const NUM_IN_BATCH: usize = (u16::MAX / 5) as usize;
const NUM_IN_BATCH: usize = (u16::MAX / 6) as usize;

ingest_reports
.try_chunks(NUM_IN_BATCH)
.err_into::<anyhow::Error>()
.try_fold(transaction, |txn, chunk| async move {
QueryBuilder::new("INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at)")
QueryBuilder::new(r#"INSERT INTO subscriber_mapping_activity(
subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at, reward_override_entity_key)"#)
.push_values(chunk, |mut b, activity| {

b.push_bind(activity.subscriber_id)
.push_bind(activity.discovery_reward_shares as i64)
.push_bind(activity.verification_reward_shares as i64)
.push_bind(activity.received_timestamp)
.push_bind(Utc::now());
.push_bind(Utc::now())
.push_bind(activity.reward_override_entity_key);
})
.push("ON CONFLICT (subscriber_id, received_timestamp) DO NOTHING")
.build()
Expand All @@ -45,7 +47,7 @@ pub async fn rewardable_mapping_activity(
) -> anyhow::Result<Vec<SubscriberMappingShares>> {
sqlx::query_as(
r#"
SELECT DISTINCT ON (subscriber_id) subscriber_id, discovery_reward_shares, verification_reward_shares
SELECT DISTINCT ON (subscriber_id) subscriber_id, discovery_reward_shares, verification_reward_shares, reward_override_entity_key
FROM subscriber_mapping_activity
WHERE received_timestamp >= $1
AND received_timestamp < $2
Expand Down
10 changes: 9 additions & 1 deletion mobile_verifier/tests/integrations/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use mobile_config::{
sub_dao_epoch_reward_info::EpochRewardInfo,
};
use mobile_verifier::{
boosting_oracles::AssignedCoverageObjects, GatewayResolution, GatewayResolver, PriceInfo,
boosting_oracles::AssignedCoverageObjects,
subscriber_mapping_activity::SubscriberMappingShares, GatewayResolution, GatewayResolver,
PriceInfo,
};
use rust_decimal::{prelude::ToPrimitive, Decimal};
use rust_decimal_macros::dec;
Expand Down Expand Up @@ -446,6 +448,12 @@ impl AsStringKeyedMapKey for PromotionReward {
self.entity.to_owned()
}
}
impl AsStringKeyedMapKey for SubscriberMappingShares {
fn key(&self) -> String {
use helium_proto::Message;
String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id")
}
}

impl<V: AsStringKeyedMapKey + Clone> AsStringKeyedMap<V> for Vec<V> {
fn as_keyed_map(&self) -> HashMap<String, V>
Expand Down
32 changes: 32 additions & 0 deletions mobile_verifier/tests/integrations/rewarder_mappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,34 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> {
Ok(())
}

#[sqlx::test]
async fn reward_mapper_check_entity_key_db(pool: PgPool) {
let reward_info = reward_info_24_hours();
// seed db
let mut txn = pool.clone().begin().await.unwrap();
seed_mapping_data(reward_info.epoch_period.end, &mut txn)
.await
.unwrap();
txn.commit().await.expect("db txn failed");

let rewardable_mapping_activity = subscriber_mapping_activity::db::rewardable_mapping_activity(
&pool,
&reward_info.epoch_period,
)
.await
.unwrap();

let sub_map = rewardable_mapping_activity.as_keyed_map();
let sub_1 = sub_map.get(SUBSCRIBER_1).expect("sub 1");
let sub_3 = sub_map.get(SUBSCRIBER_3).expect("sub 3");

assert!(sub_1.reward_override_entity_key.is_none());
assert_eq!(
sub_3.reward_override_entity_key,
Some("entity key".to_string())
);
Comment on lines +90 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate checks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap, good catch

}

async fn seed_mapping_data(
ts: DateTime<Utc>,
txn: &mut Transaction<'_, Postgres>,
Expand All @@ -84,27 +112,31 @@ async fn seed_mapping_data(
discovery_reward_shares: 30,
verification_reward_shares: 0,
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
reward_override_entity_key: None,
},
SubscriberMappingActivity {
received_timestamp: ts - ChronoDuration::hours(2),
subscriber_id: SUBSCRIBER_1.to_string().encode_to_vec(),
discovery_reward_shares: 30,
verification_reward_shares: 0,
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
reward_override_entity_key: None,
},
SubscriberMappingActivity {
received_timestamp: ts - ChronoDuration::hours(1),
subscriber_id: SUBSCRIBER_2.to_string().encode_to_vec(),
discovery_reward_shares: 30,
verification_reward_shares: 0,
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
reward_override_entity_key: None,
},
SubscriberMappingActivity {
received_timestamp: ts - ChronoDuration::hours(1),
subscriber_id: SUBSCRIBER_3.to_string().encode_to_vec(),
discovery_reward_shares: 30,
verification_reward_shares: 0,
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
reward_override_entity_key: Some("entity key".to_string()),
},
];

Expand Down
Loading
Loading