diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3ad471e73..bda7fbc65 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -118,6 +118,13 @@ jobs: --health-retries 5 ports: - 5432:5432 + localstack: + image: localstack/localstack:latest + environment: + SERVICES: s3 + EAGER_SERVICE_LOADING: 1 + ports: + - 4566:4566 steps: - name: Install Dependencies run: | @@ -229,4 +236,5 @@ jobs: RELEASE_VERSION: ${{ github.event.inputs.release_version }} run: | chmod +x ./.github/scripts/make_debian.sh - ./.github/scripts/make_debian.sh \ No newline at end of file + ./.github/scripts/make_debian.sh + diff --git a/.github/workflows/DockerCI.yml b/.github/workflows/DockerCI.yml index 358ea932c..c71f1da43 100644 --- a/.github/workflows/DockerCI.yml +++ b/.github/workflows/DockerCI.yml @@ -176,6 +176,13 @@ jobs: --health-retries 5 ports: - 5432:5432 + localstack: + image: localstack/localstack:latest + env: + SERVICES: s3 + EAGER_SERVICE_LOADING: 1 + ports: + - 4566:4566 steps: - name: Checkout Repository uses: actions/checkout@v4 @@ -218,4 +225,4 @@ jobs: cargo test -p ${{ matrix.package }} -- --include-ignored - name: Fix Permissions for Caching - run: sudo chown -R $(whoami):$(whoami) target \ No newline at end of file + run: sudo chown -R $(whoami):$(whoami) target diff --git a/Cargo.lock b/Cargo.lock index da2a653aa..bc4819134 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,6 +840,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-local" +version = "0.1.0" +dependencies = [ + "anyhow", + "aws-config", + "aws-sdk-s3", + "aws-types 0.51.0", + "chrono", + "file-store", + "prost", + "tempfile", + "tokio", + "tonic", + "triggered", + "uuid", +] + [[package]] name = "aws-sdk-s3" version = "0.21.0" @@ -4971,6 +4989,7 @@ dependencies = [ "anyhow", "async-compression", "async-trait", + "aws-local", "base64 0.22.1", "chrono", "clap", @@ -5013,6 +5032,7 @@ dependencies = [ "solana", "sqlx", "task-manager", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index fecdf7788..c4cb9c124 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "solana", "task_manager", "hex_assignments", + "aws_local" ] resolver = "2" @@ -113,6 +114,10 @@ tokio-util = "0" uuid = { version = "1", features = ["v4", "serde"] } tower-http = { version = "0", features = ["trace"] } derive_builder = "0" +aws-config = "0.51" +aws-sdk-s3 = "0.21" +aws-types = { version = "0.51", features = ["hardcoded-credentials"]} +tempfile = "3" [patch.crates-io] anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madninja/const_pubkey" } diff --git a/aws_local/Cargo.toml b/aws_local/Cargo.toml new file mode 100644 index 000000000..097a5abc2 --- /dev/null +++ b/aws_local/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "aws-local" +version = "0.1.0" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +aws-config = {workspace = true} +aws-sdk-s3 = {workspace = true} +aws-types = {workspace = true, features = ["hardcoded-credentials"]} +tokio = {workspace = true} +triggered = {workspace = true} +tonic = {workspace = true} +chrono = {workspace = true} +prost = {workspace = true} +anyhow = {workspace = true} +uuid = {workspace = true} +tempfile = {workspace = true} +file-store = { path = "../file_store", features = ["local"] } diff --git a/aws_local/src/README.md b/aws_local/src/README.md new file mode 100644 index 000000000..d5da3c71c --- /dev/null +++ b/aws_local/src/README.md @@ -0,0 +1 @@ +It helps to run tests with [localstack](https://www.localstack.cloud/) diff --git a/aws_local/src/lib.rs b/aws_local/src/lib.rs new file mode 100644 index 000000000..65ed6a660 --- /dev/null +++ b/aws_local/src/lib.rs @@ -0,0 +1,166 @@ +use anyhow::{anyhow, Result}; +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::{Client, Endpoint, Region}; +use chrono::Utc; +use file_store::traits::MsgBytes; +use file_store::{file_sink, file_upload, FileStore, FileType, Settings}; +use std::path::Path; +use std::{str::FromStr, sync::Arc}; +use tempfile::TempDir; +use tokio::sync::Mutex; +use tonic::transport::Uri; +use uuid::Uuid; + +pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566"; + +pub fn gen_bucket_name() -> String { + format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis()) +} + +// Interacts with the locastack. +// Used to create mocked aws buckets and files. +pub struct AwsLocal { + pub fs_settings: Settings, + pub file_store: FileStore, + pub aws_client: aws_sdk_s3::Client, +} + +impl AwsLocal { + async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client { + let endpoint: Option = match &settings.endpoint { + Some(endpoint) => Uri::from_str(endpoint) + .map(Endpoint::immutable) + .map(Some) + .unwrap(), + _ => None, + }; + let region = Region::new(settings.region.clone()); + let region_provider = RegionProviderChain::first_try(region).or_default_provider(); + + let mut config = aws_config::from_env().region(region_provider); + config = config.endpoint_resolver(endpoint.unwrap()); + + let creds = aws_types::credentials::Credentials::from_keys( + settings.access_key_id.as_ref().unwrap(), + settings.secret_access_key.as_ref().unwrap(), + None, + ); + config = config.credentials_provider(creds); + + let config = config.load().await; + + Client::new(&config) + } + + pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal { + let settings = Settings { + bucket: bucket.into(), + endpoint: Some(endpoint.into()), + region: "us-east-1".into(), + access_key_id: Some("random".into()), + secret_access_key: Some("random2".into()), + }; + let client = Self::create_aws_client(&settings).await; + client.create_bucket().bucket(bucket).send().await.unwrap(); + AwsLocal { + aws_client: client, + fs_settings: settings.clone(), + file_store: file_store::FileStore::from_settings(&settings) + .await + .unwrap(), + } + } + + pub fn fs_settings(&self) -> Settings { + self.fs_settings.clone() + } + + pub async fn put_proto_to_aws( + &self, + items: Vec, + file_type: FileType, + metric_name: &'static str, + ) -> Result { + let tmp_dir = TempDir::new()?; + let tmp_dir_path = tmp_dir.path().to_owned(); + + let (shutdown_trigger, shutdown_listener) = triggered::trigger(); + + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&self.fs_settings) + .await + .unwrap(); + + let (item_sink, item_server) = + file_sink::FileSinkBuilder::new(file_type, &tmp_dir_path, file_upload, metric_name) + .auto_commit(false) + .roll_time(std::time::Duration::new(15, 0)) + .create::() + .await + .unwrap(); + + for item in items { + item_sink.write(item, &[]).await.unwrap(); + } + let item_recv = item_sink.commit().await.unwrap(); + + let uploaded_file = Arc::new(Mutex::new(String::default())); + let up_2 = uploaded_file.clone(); + let mut timeout = std::time::Duration::new(5, 0); + + tokio::spawn(async move { + let uploaded_files = item_recv.await.unwrap().unwrap(); + assert!(uploaded_files.len() == 1); + let mut val = up_2.lock().await; + *val = uploaded_files.first().unwrap().to_string(); + + // After files uploaded to aws the must be removed. + // So we wait when dir will be empty. + // It means all files are uploaded to aws + loop { + if is_dir_has_files(&tmp_dir_path) { + let dur = std::time::Duration::from_millis(10); + tokio::time::sleep(dur).await; + timeout -= dur; + continue; + } + break; + } + + shutdown_trigger.trigger(); + }); + + tokio::try_join!( + file_upload_server.run(shutdown_listener.clone()), + item_server.run(shutdown_listener.clone()) + ) + .unwrap(); + + tmp_dir.close()?; + + let res = uploaded_file.lock().await; + Ok(res.clone()) + } + + pub async fn put_file_to_aws(&self, file_path: &Path) -> Result<()> { + let path_str = file_path.display(); + if !file_path.exists() { + return Err(anyhow!("File {path_str} is absent")); + } + if !file_path.is_file() { + return Err(anyhow!("File {path_str} is not a file")); + } + self.file_store.put(file_path).await?; + + Ok(()) + } +} + +fn is_dir_has_files(dir_path: &Path) -> bool { + let entries = std::fs::read_dir(dir_path) + .unwrap() + .map(|res| res.map(|e| e.path().is_dir())) + .collect::, std::io::Error>>() + .unwrap(); + entries.contains(&false) +} diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 74f99de33..f1cba02f1 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -30,9 +30,9 @@ helium-proto = {workspace = true} helium-crypto = {workspace = true} csv = "*" http = {workspace = true} -aws-config = "0.51" -aws-sdk-s3 = "0.21" -aws-types = { version = "0.51", features = ["hardcoded-credentials"], optional = true} +aws-config = {workspace = true} +aws-sdk-s3 = {workspace = true} +aws-types = {workspace = true, optional = true} strum = {version = "0", features = ["derive"]} strum_macros = "0" sha2 = {workspace = true} @@ -53,7 +53,7 @@ task-manager = { path = "../task_manager" } [dev-dependencies] hex-literal = "0" -tempfile = "3" +tempfile = { workspace = true } [features] default = ["sqlx-postgres"] diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 4181e5d84..747de4738 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -65,3 +65,6 @@ task-manager = { path = "../task_manager" } [dev-dependencies] proptest = "1.5.0" +aws-local = { path = "../aws_local" } +tempfile = "3" + diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index c7f6200bd..d3b4d9a28 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -315,7 +315,7 @@ impl DataSetDownloaderDaemon { } } - async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { + pub async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { let new_urbanized = self .data_sets .urbanization @@ -392,8 +392,7 @@ impl DataSetDownloaderDaemon { Ok(()) } - pub async fn run(mut self) -> anyhow::Result<()> { - tracing::info!("Starting data set downloader task"); + pub async fn fetch_first_datasets(&mut self) -> anyhow::Result<()> { self.data_sets .urbanization .fetch_first_data_set(&self.pool, &self.data_set_directory) @@ -410,6 +409,12 @@ impl DataSetDownloaderDaemon { .service_provider_override .fetch_first_data_set(&self.pool, &self.data_set_directory) .await?; + Ok(()) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + tracing::info!("Starting data set downloader task"); + self.fetch_first_datasets().await?; // Attempt to fill in any unassigned hexes. This is for the edge case in // which we shutdown before a coverage object updates. if is_hex_boost_data_ready(&self.data_sets) { diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index af4213e7b..ee3ddf735 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -3,12 +3,15 @@ use anyhow::Context; use chrono::{DateTime, Duration, Utc}; use file_store::{ coverage::RadioHexSignalLevel, + file_upload::{self, FileUpload}, speedtest::CellSpeedtest, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, wifi_heartbeat::{WifiHeartbeat, WifiHeartbeatIngestReport}, }; use futures::stream::{self, StreamExt}; use h3o::CellIndex; use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::OracleBoostingReportV1; use helium_proto::services::poc_mobile::{ CoverageObjectValidity, LocationSource, OracleBoostingHexAssignment, SignalLevel, }; @@ -16,7 +19,11 @@ use hex_assignments::Assignment; use mobile_config::boosted_hex_info::BoostedHexes; use mobile_verifier::{ banning::BannedRadios, - coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache}, + boosting_oracles::DataSetDownloaderDaemon, + coverage::{ + new_coverage_object_notification_channel, CoverageClaimTimeCache, CoverageObject, + CoverageObjectCache, NewCoverageObjectNotification, + }, geofence::GeofenceValidator, heartbeats::{last_location::LocationCache, Heartbeat, HeartbeatReward, ValidatedHeartbeat}, reward_shares::CoverageShares, @@ -30,6 +37,7 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::PgPool; use std::{collections::HashMap, pin::pin}; +use tempfile::TempDir; use uuid::Uuid; #[derive(Clone)] @@ -96,6 +104,121 @@ fn hex_cell(loc: &str) -> hextree::Cell { hextree::Cell::from_raw(u64::from_str_radix(loc, 16).unwrap()).unwrap() } +use aws_local::*; +use hex_assignments::HexBoostData; +use std::{path::PathBuf, str::FromStr}; + +pub async fn create_data_set_downloader( + pool: PgPool, + file_paths: Vec, + file_upload: FileUpload, + new_coverage_object_notification: NewCoverageObjectNotification, + tmp_dir: &TempDir, +) -> (DataSetDownloaderDaemon, PathBuf, String) { + let bucket_name = gen_bucket_name(); + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + + for file_path in file_paths { + awsl.put_file_to_aws(&file_path).await.unwrap(); + } + + let uuid: Uuid = Uuid::new_v4(); + let data_set_directory = tmp_dir.path().join(uuid.to_string()); + tokio::fs::create_dir_all(data_set_directory.clone()) + .await + .unwrap(); + + let file_store = awsl.file_store.clone(); + let poll_duration = std::time::Duration::from_secs(4); + + let (oracle_boosting_reports, _) = OracleBoostingReportV1::file_sink( + tmp_dir.path(), + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(std::time::Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), + ) + .await + .unwrap(); + + let mut data_set_downloader = DataSetDownloaderDaemon::new( + pool, + HexBoostData::default(), + file_store, + oracle_boosting_reports, + data_set_directory.clone(), + new_coverage_object_notification, + poll_duration, + ); + + data_set_downloader.fetch_first_datasets().await.unwrap(); + data_set_downloader.check_for_new_data_sets().await.unwrap(); + (data_set_downloader, data_set_directory, bucket_name) +} + +pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { + sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS(SELECT 1 FROM hex_assignment_data_set_status WHERE filename = $1) + "#, + ) + .bind(filename) + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test] +async fn test_dataset_downloader(pool: PgPool) { + // Scenario: + // 1. DataSetDownloader downloads initial files + // 2. Upload a new file + // 3. DataSetDownloader downloads new file + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/integrations/fixtures/{}", f))) + .collect(); + + let (file_upload_tx, _file_upload_rx) = file_upload::message_channel(); + let file_upload = FileUpload { + sender: file_upload_tx, + }; + + let (_, new_coverage_obj_notification) = new_coverage_object_notification_channel(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let (mut data_set_downloader, _, bucket_name) = create_data_set_downloader( + pool.clone(), + file_paths, + file_upload, + new_coverage_obj_notification, + &tmp_dir, + ) + .await; + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + awsl.put_file_to_aws( + &PathBuf::from_str("./tests/integrations/fixtures/footfall.1732895200000.gz").unwrap(), + ) + .await + .unwrap(); + data_set_downloader.check_for_new_data_sets().await.unwrap(); + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); +} + #[sqlx::test] async fn test_footfall_and_urbanization_report(pool: PgPool) -> anyhow::Result<()> { let uuid = Uuid::new_v4(); diff --git a/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz b/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz new file mode 100644 index 000000000..0fb00d726 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz b/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz new file mode 100644 index 000000000..c7e8498b3 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz new file mode 100644 index 000000000..66f369c81 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz differ