diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3ad471e73..bda7fbc65 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -118,6 +118,13 @@ jobs: --health-retries 5 ports: - 5432:5432 + localstack: + image: localstack/localstack:latest + environment: + SERVICES: s3 + EAGER_SERVICE_LOADING: 1 + ports: + - 4566:4566 steps: - name: Install Dependencies run: | @@ -229,4 +236,5 @@ jobs: RELEASE_VERSION: ${{ github.event.inputs.release_version }} run: | chmod +x ./.github/scripts/make_debian.sh - ./.github/scripts/make_debian.sh \ No newline at end of file + ./.github/scripts/make_debian.sh + diff --git a/.github/workflows/DockerCI.yml b/.github/workflows/DockerCI.yml index 358ea932c..c71f1da43 100644 --- a/.github/workflows/DockerCI.yml +++ b/.github/workflows/DockerCI.yml @@ -176,6 +176,13 @@ jobs: --health-retries 5 ports: - 5432:5432 + localstack: + image: localstack/localstack:latest + env: + SERVICES: s3 + EAGER_SERVICE_LOADING: 1 + ports: + - 4566:4566 steps: - name: Checkout Repository uses: actions/checkout@v4 @@ -218,4 +225,4 @@ jobs: cargo test -p ${{ matrix.package }} -- --include-ignored - name: Fix Permissions for Caching - run: sudo chown -R $(whoami):$(whoami) target \ No newline at end of file + run: sudo chown -R $(whoami):$(whoami) target diff --git a/Cargo.lock b/Cargo.lock index da2a653aa..4c9488cc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,6 +840,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-local" +version = "0.1.0" +dependencies = [ + "anyhow", + "aws-config", + "aws-sdk-s3", + "aws-types 0.51.0", + "chrono", + "file-store", + "prost", + "tokio", + "tonic", + "triggered", + "uuid", +] + [[package]] name = "aws-sdk-s3" version = "0.21.0" @@ -2364,6 +2381,29 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "dataset-downloader" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-compression", + "async-trait", + "aws-local", + "chrono", + "file-store", + "futures-util", + "hex-assignments", + "hextree", + "lazy_static", + "regex", + "sqlx", + "tempfile", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "db-store" version = "0.1.0" @@ -4971,6 +5011,7 @@ dependencies = [ "anyhow", "async-compression", "async-trait", + "aws-local", "base64 0.22.1", "chrono", "clap", @@ -4978,6 +5019,7 @@ dependencies = [ "coverage-map", "coverage-point-calculator", "custom-tracing", + "dataset-downloader", "db-store", "derive_builder", "file-store", @@ -5013,6 +5055,7 @@ dependencies = [ "solana", "sqlx", "task-manager", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index fecdf7788..405157344 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,8 @@ members = [ "solana", "task_manager", "hex_assignments", -] + "aws_local" +, "dataset_downloader"] resolver = "2" [workspace.package] @@ -113,6 +114,11 @@ tokio-util = "0" uuid = { version = "1", features = ["v4", "serde"] } tower-http = { version = "0", features = ["trace"] } derive_builder = "0" +aws-config = "0.51" +aws-sdk-s3 = "0.21" +aws-types = { version = "0.51", features = ["hardcoded-credentials"]} +regex = "1" +async-compression = { version = "0", features = ["tokio", "gzip"] } [patch.crates-io] anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madninja/const_pubkey" } diff --git a/aws_local/Cargo.toml b/aws_local/Cargo.toml new file mode 100644 index 000000000..5db78dbfb --- /dev/null +++ b/aws_local/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "aws-local" +version = "0.1.0" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +aws-config = {workspace = true} +aws-sdk-s3 = {workspace = true} +aws-types = {workspace = true, features = ["hardcoded-credentials"]} +tokio = {workspace = true} +triggered = {workspace = true} +tonic = {workspace = true} +chrono = {workspace = true} +prost = {workspace = true} +anyhow = {workspace = true} +uuid = {workspace = true} +file-store = { path = "../file_store", features = ["local"] } + diff --git a/aws_local/src/README.md b/aws_local/src/README.md new file mode 100644 index 000000000..d5da3c71c --- /dev/null +++ b/aws_local/src/README.md @@ -0,0 +1 @@ +It helps to run tests with [localstack](https://www.localstack.cloud/) diff --git a/aws_local/src/lib.rs b/aws_local/src/lib.rs new file mode 100644 index 000000000..a8d47b657 --- /dev/null +++ b/aws_local/src/lib.rs @@ -0,0 +1,159 @@ +use anyhow::{anyhow, Result}; +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::{Client, Endpoint, Region}; +use chrono::Utc; +use file_store::traits::MsgBytes; +use file_store::{file_sink, file_upload, FileStore, FileType, Settings}; +use std::path::Path; +use std::{str::FromStr, sync::Arc}; +use tokio::sync::Mutex; +use tonic::transport::Uri; +use uuid::Uuid; + +pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566"; + +pub fn gen_bucket_name() -> String { + format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis()) +} + +// Interacts with the locastack. +// Used to create mocked aws buckets and files. +pub struct AwsLocal { + pub fs_settings: Settings, + pub file_store: FileStore, + pub aws_client: aws_sdk_s3::Client, +} + +impl AwsLocal { + async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client { + let endpoint: Option = match &settings.endpoint { + Some(endpoint) => Uri::from_str(endpoint) + .map(Endpoint::immutable) + .map(Some) + .unwrap(), + _ => None, + }; + let region = Region::new(settings.region.clone()); + let region_provider = RegionProviderChain::first_try(region).or_default_provider(); + + let mut config = aws_config::from_env().region(region_provider); + config = config.endpoint_resolver(endpoint.unwrap()); + + let creds = aws_types::credentials::Credentials::from_keys( + settings.access_key_id.as_ref().unwrap(), + settings.secret_access_key.as_ref().unwrap(), + None, + ); + config = config.credentials_provider(creds); + + let config = config.load().await; + + Client::new(&config) + } + + pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal { + let settings = Settings { + bucket: bucket.into(), + endpoint: Some(endpoint.into()), + region: "us-east-1".into(), + access_key_id: Some("random".into()), + secret_access_key: Some("random2".into()), + }; + let client = Self::create_aws_client(&settings).await; + client.create_bucket().bucket(bucket).send().await.unwrap(); + AwsLocal { + aws_client: client, + fs_settings: settings.clone(), + file_store: file_store::FileStore::from_settings(&settings) + .await + .unwrap(), + } + } + pub async fn put_proto_to_aws( + &self, + items: Vec, + file_type: FileType, + metric_name: &'static str, + ) -> Result { + // Uuid uses as random to avoid colisions + let uuid: Uuid = Uuid::new_v4(); + let dir_path = format!("/tmp/{}/{}", uuid, self.fs_settings.bucket); + let store_base_path = std::path::Path::new(&dir_path); + let (shutdown_trigger, shutdown_listener) = triggered::trigger(); + + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&self.fs_settings) + .await + .unwrap(); + + let (item_sink, item_server) = + file_sink::FileSinkBuilder::new(file_type, store_base_path, file_upload, metric_name) + .auto_commit(false) + .roll_time(std::time::Duration::new(15, 0)) + .create::() + .await + .unwrap(); + + for item in items { + item_sink.write(item, &[]).await.unwrap(); + } + let item_recv = item_sink.commit().await.unwrap(); + + let dir_path_clone = dir_path.clone(); + let uploaded_file = Arc::new(Mutex::new(String::default())); + let up_2 = uploaded_file.clone(); + let mut timeout = std::time::Duration::new(5, 0); + + tokio::spawn(async move { + let uploaded_files = item_recv.await.unwrap().unwrap(); + assert!(uploaded_files.len() == 1); + let mut val = up_2.lock().await; + *val = uploaded_files.first().unwrap().to_string(); + + // After files uploaded to aws the must be removed. + // So we wait when dir will be empty. + // It means all files are uploaded to aws + loop { + if is_dir_has_files(&dir_path_clone) { + let dur = std::time::Duration::from_millis(10); + tokio::time::sleep(dur).await; + timeout -= dur; + continue; + } + break; + } + + shutdown_trigger.trigger(); + }); + + tokio::try_join!( + file_upload_server.run(shutdown_listener.clone()), + item_server.run(shutdown_listener.clone()) + ) + .unwrap(); + std::fs::remove_dir_all(dir_path).unwrap(); + let res = uploaded_file.lock().await; + Ok(res.clone()) + } + pub async fn put_file_to_aws(&self, file_path: &Path) -> Result<()> { + let path_str = file_path.display(); + if !file_path.exists() { + return Err(anyhow!("File {path_str} is absent")); + } + if !file_path.is_file() { + return Err(anyhow!("File {path_str} is not a file")); + } + self.file_store.put(file_path).await?; + + Ok(()) + } +} + +fn is_dir_has_files(dir_path: &str) -> bool { + let entries = std::fs::read_dir(dir_path) + .unwrap() + .map(|res| res.map(|e| e.path().is_dir())) + .collect::, std::io::Error>>() + .unwrap(); + entries.contains(&false) +} diff --git a/dataset_downloader/Cargo.toml b/dataset_downloader/Cargo.toml new file mode 100644 index 000000000..fe8d8c71a --- /dev/null +++ b/dataset_downloader/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "dataset-downloader" +version = "0.1.0" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +sqlx = { workspace = true } +anyhow = { workspace = true } +uuid = { workspace = true } +async-trait = { workspace = true } +regex = { workspace = true } +hextree = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +lazy_static = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } +futures-util = { workspace = true } +async-compression = { workspace = true } +file-store = { path = "../file_store" } +hex-assignments = { path = "../hex_assignments" } + +[dev-dependencies] +aws-local = { path = "../aws_local" } +tempfile = "3" + diff --git a/dataset_downloader/src/lib.rs b/dataset_downloader/src/lib.rs new file mode 100644 index 000000000..5b9ec1a5f --- /dev/null +++ b/dataset_downloader/src/lib.rs @@ -0,0 +1,670 @@ +use std::path::{Path, PathBuf}; + +use chrono::{DateTime, Utc}; +use futures_util::{Stream, StreamExt}; +use hex_assignments::assignment::HexAssignments; +use hex_assignments::HexBoostDataAssignmentsExt; +use hextree::disktree::DiskTreeMap; +use lazy_static::lazy_static; +use regex::Regex; +use sqlx::{FromRow, PgPool, Postgres, Transaction}; +use sqlx::{PgConnection, Type}; +use tokio::{fs::File, io::AsyncWriteExt}; + +use file_store::{traits::TimestampDecode, FileStore}; +use hex_assignments::{ + footfall::Footfall, landtype::Landtype, service_provider_override::ServiceProviderOverride, + urbanization::Urbanization, HexAssignment, HexBoostData, +}; + +#[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Type)] +#[sqlx(type_name = "signal_level")] +#[sqlx(rename_all = "lowercase")] +pub enum SignalLevel { + None, + Low, + Medium, + High, +} +#[derive(FromRow)] +pub struct UnassignedHex { + pub uuid: uuid::Uuid, + #[sqlx(try_from = "i64")] + pub hex: u64, + pub signal_level: SignalLevel, + pub signal_power: i32, +} + +pub struct AssignedHex { + pub uuid: uuid::Uuid, + pub hex: u64, + pub signal_level: SignalLevel, + pub signal_power: i32, + pub assignments: HexAssignments, +} + +impl UnassignedHex { + pub fn assign( + self, + data_sets: &impl HexBoostDataAssignmentsExt, + ) -> anyhow::Result { + let cell = hextree::Cell::try_from(self.hex)?; + + Ok(AssignedHex { + uuid: self.uuid, + hex: self.hex, + signal_level: self.signal_level, + signal_power: self.signal_power, + assignments: data_sets.assignments(cell)?, + }) + } +} + +#[async_trait::async_trait] +pub trait NewDataSetHandler: Send + Sync + 'static { + // Calls when new data set arrived but before it marked as processed + // If this function fails, new data sets will not be marked as processed. + // Don't call txn.commit() inside callback + async fn callback( + &self, + txn: &mut Transaction<'_, Postgres>, + data_sets: &HexBoostData, + ) -> anyhow::Result<()>; +} + +#[async_trait::async_trait] +pub trait DataSet: HexAssignment + Send + Sync + 'static { + const TYPE: DataSetType; + + fn timestamp(&self) -> Option>; + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()>; + + fn is_ready(&self) -> bool; + + async fn fetch_first_data_set( + &mut self, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result<()> { + let Some(first_data_set) = db::fetch_latest_processed_data_set(pool, Self::TYPE).await? + else { + return Ok(()); + }; + let path = get_data_set_path(data_set_directory, Self::TYPE, first_data_set.time_to_use); + self.update(Path::new(&path), first_data_set.time_to_use)?; + Ok(()) + } + + async fn check_for_available_data_sets( + &self, + store: &FileStore, + pool: &PgPool, + ) -> anyhow::Result<()> { + tracing::info!("Checking for new {} data sets", Self::TYPE.to_prefix()); + let mut new_data_sets = store.list(Self::TYPE.to_prefix(), self.timestamp(), None); + while let Some(new_data_set) = new_data_sets.next().await.transpose()? { + db::insert_new_data_set(pool, &new_data_set.key, Self::TYPE, new_data_set.timestamp) + .await?; + } + Ok(()) + } + + async fn fetch_next_available_data_set( + &mut self, + store: &FileStore, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result> { + self.check_for_available_data_sets(store, pool).await?; + + let latest_unprocessed_data_set = + db::fetch_latest_unprocessed_data_set(pool, Self::TYPE, self.timestamp()).await?; + + let Some(latest_unprocessed_data_set) = latest_unprocessed_data_set else { + return Ok(None); + }; + + let path = get_data_set_path( + data_set_directory, + Self::TYPE, + latest_unprocessed_data_set.time_to_use, + ); + + if !latest_unprocessed_data_set.status.is_downloaded() { + download_data_set(store, &latest_unprocessed_data_set.filename, &path).await?; + let con = &mut pool.acquire().await?; + latest_unprocessed_data_set.mark_as_downloaded(con).await?; + tracing::info!( + data_set = latest_unprocessed_data_set.filename, + "Data set download complete" + ); + } + + self.update(Path::new(&path), latest_unprocessed_data_set.time_to_use)?; + + Ok(Some(latest_unprocessed_data_set)) + } +} + +#[async_trait::async_trait] +impl DataSet for Footfall { + const TYPE: DataSetType = DataSetType::Footfall; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.footfall = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.footfall.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for Landtype { + const TYPE: DataSetType = DataSetType::Landtype; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.landtype = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.landtype.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for Urbanization { + const TYPE: DataSetType = DataSetType::Urbanization; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.urbanized = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.urbanized.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for ServiceProviderOverride { + const TYPE: DataSetType = DataSetType::ServiceProviderOverride; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.service_provider_override = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.service_provider_override.is_some() + } +} + +pub struct DataSetDownloader { + pool: PgPool, + store: FileStore, + data_set_directory: PathBuf, +} + +#[derive(FromRow, Debug)] +pub struct NewDataSet { + filename: String, + time_to_use: DateTime, + status: DataSetStatus, +} + +impl NewDataSet { + async fn mark_as_downloaded(&self, con: &mut PgConnection) -> anyhow::Result<()> { + db::set_data_set_status(con, &self.filename, DataSetStatus::Downloaded).await?; + Ok(()) + } + + async fn mark_as_processed(&self, con: &mut PgConnection) -> anyhow::Result<()> { + db::set_data_set_status(con, &self.filename, DataSetStatus::Processed).await?; + Ok(()) + } + + pub fn filename(&self) -> &String { + &self.filename + } +} + +#[derive(Copy, Clone, sqlx::Type, Debug)] +#[sqlx(type_name = "data_set_status")] +#[sqlx(rename_all = "lowercase")] +pub enum DataSetStatus { + Pending, + Downloaded, + Processed, +} + +impl DataSetStatus { + pub fn is_downloaded(&self) -> bool { + matches!(self, Self::Downloaded) + } +} + +pub fn is_hex_boost_data_ready(data_sets: &HexBoostData) -> bool { + let h = &data_sets; + h.urbanization.is_ready() + && h.footfall.is_ready() + && h.landtype.is_ready() + && h.service_provider_override.is_ready() +} + +impl DataSetDownloader { + pub fn new(pool: PgPool, store: FileStore, data_set_directory: PathBuf) -> Self { + Self { + pool, + store, + data_set_directory, + } + } + + pub async fn check_for_new_data_sets( + &mut self, + data_set_processor: Option<&dyn NewDataSetHandler>, + mut data_sets: HexBoostData, + ) -> anyhow::Result { + let new_urbanized = data_sets + .urbanization + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_footfall = data_sets + .footfall + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_landtype = data_sets + .landtype + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_service_provider_override = data_sets + .service_provider_override + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + + let new_data_set = new_urbanized.is_some() + || new_footfall.is_some() + || new_landtype.is_some() + || new_service_provider_override.is_some(); + + if !new_data_set { + return Ok(data_sets); + } + + let mut txn = self.pool.begin().await?; + + if let Some(dsp) = data_set_processor { + if is_hex_boost_data_ready(&data_sets) { + tracing::info!("Processing new data sets"); + dsp.callback(&mut txn, &data_sets).await?; + } + } + + // Mark the new data sets as processed and delete the old ones + if let Some(ref new_urbanized) = new_urbanized { + new_urbanized.mark_as_processed(&mut txn).await?; + } + + if let Some(ref new_footfall) = new_footfall { + new_footfall.mark_as_processed(&mut txn).await?; + } + if let Some(ref new_landtype) = new_landtype { + new_landtype.mark_as_processed(&mut txn).await?; + } + + if let Some(ref new_service_provider_override) = new_service_provider_override { + new_service_provider_override + .mark_as_processed(&mut txn) + .await?; + } + txn.commit().await?; + + // Ignoring tracing error messages can be critical if server out of space + if let Some(new_urbanized) = new_urbanized { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Urbanization, + new_urbanized.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_urbanized.time_to_use, + "Deleting old urbanized data set file is failed." + ); + } + } + if let Some(new_footfall) = new_footfall { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Footfall, + new_footfall.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_footfall.time_to_use, + "Deleting old fotfall data set file is failed." + ); + } + } + if let Some(new_landtype) = new_landtype { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Landtype, + new_landtype.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_landtype.time_to_use, + "Deleting old landtype data set file is failed." + ); + } + } + if let Some(new_service_provider_override) = new_service_provider_override { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::ServiceProviderOverride, + new_service_provider_override.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_service_provider_override.time_to_use, + "Deleting old service_provider_override data set file is failed." + ); + } + } + + Ok(data_sets) + } + pub async fn fetch_first_datasets( + &self, + mut data_sets: HexBoostData, + ) -> anyhow::Result { + data_sets + .urbanization + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .footfall + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .landtype + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .service_provider_override + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + Ok(data_sets) + } +} + +fn get_data_set_path( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> PathBuf { + let path = PathBuf::from(format!( + "{}.{}.{}.h3tree", + data_set_type.to_prefix(), + time_to_use.timestamp_millis(), + data_set_type.to_hex_res_prefix(), + )); + let mut dir = data_set_directory.to_path_buf(); + dir.push(path); + dir +} + +lazy_static! { + static ref RE: Regex = Regex::new(r"([a-z,_]+).(\d+)(.res[0-9]{1,2}.h3tree)?").unwrap(); +} + +async fn delete_old_data_sets( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> anyhow::Result<()> { + let mut data_sets = tokio::fs::read_dir(data_set_directory).await?; + while let Some(data_set) = data_sets.next_entry().await? { + let file_name = data_set.file_name(); + let file_name = file_name.to_string_lossy(); + let Some(cap) = RE.captures(&file_name) else { + tracing::warn!("Could not determine data set file type: {}", file_name); + continue; + }; + let prefix = &cap[1]; + let timestamp = cap[2].parse::()?.to_timestamp_millis()?; + if prefix == data_set_type.to_prefix() && timestamp < time_to_use { + tracing::info!(data_set = &*file_name, "Deleting old data set file"); + tokio::fs::remove_file(data_set.path()).await?; + } + } + Ok(()) +} + +async fn download_data_set( + store: &FileStore, + in_file_name: &str, + out_path: &Path, +) -> anyhow::Result<()> { + tracing::info!("Downloading new data set: {}", out_path.to_string_lossy()); + let stream = store.get_raw(in_file_name).await?; + let mut bytes = tokio_util::codec::FramedRead::new( + async_compression::tokio::bufread::GzipDecoder::new(tokio_util::io::StreamReader::new( + stream, + )), + tokio_util::codec::BytesCodec::new(), + ); + let mut file = File::create(&out_path).await?; + while let Some(bytes) = bytes.next().await.transpose()? { + file.write_all(&bytes).await?; + } + Ok(()) +} + +#[derive(Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "data_set_type")] +#[sqlx(rename_all = "snake_case")] +pub enum DataSetType { + Urbanization, + Footfall, + Landtype, + ServiceProviderOverride, +} + +impl DataSetType { + pub fn to_prefix(self) -> &'static str { + match self { + Self::Urbanization => "urbanization", + Self::Footfall => "footfall", + Self::Landtype => "landtype", + Self::ServiceProviderOverride => "service_provider_override", + } + } + + pub fn to_hex_res_prefix(self) -> &'static str { + match self { + Self::Urbanization => "res10", + Self::Footfall => "res10", + Self::Landtype => "res10", + Self::ServiceProviderOverride => "res12", + } + } +} +pub mod db { + use super::*; + + pub async fn fetch_latest_file_date( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar("SELECT time_to_use FROM hex_assignment_data_set_status WHERE data_set = $1 ORDER BY time_to_use DESC LIMIT 1") + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn insert_new_data_set( + pool: &PgPool, + filename: &str, + data_set_type: DataSetType, + time_to_use: DateTime, + ) -> sqlx::Result<()> { + sqlx::query( + r#" + INSERT INTO hex_assignment_data_set_status (filename, data_set, time_to_use, status) + VALUES ($1, $2, $3, 'pending') + ON CONFLICT DO NOTHING + "#, + ) + .bind(filename) + .bind(data_set_type) + .bind(time_to_use) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn fetch_latest_unprocessed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + since: Option>, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status != 'processed' AND data_set = $1 AND COALESCE(time_to_use > $2, TRUE) AND time_to_use <= $3 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .bind(since) + .bind(Utc::now()) + .fetch_optional(pool) + .await + } + + pub async fn fetch_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn set_data_set_status( + con: &mut PgConnection, + filename: &str, + status: DataSetStatus, + ) -> sqlx::Result<()> { + sqlx::query("UPDATE hex_assignment_data_set_status SET status = $1 WHERE filename = $2") + .bind(status) + .bind(filename) + .execute(con) + .await?; + Ok(()) + } + + pub async fn fetch_time_of_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar( + "SELECT time_to_use FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + /// Check if there are any pending or downloaded files prior to the given reward period + pub async fn check_for_unprocessed_data_sets( + pool: &PgPool, + period_end: DateTime, + ) -> sqlx::Result { + Ok(sqlx::query_scalar( + "SELECT COUNT(*) > 0 FROM hex_assignment_data_set_status WHERE time_to_use <= $1 AND status != 'processed'", + ) + .bind(period_end) + .fetch_one(pool) + .await? + || sqlx::query_scalar( + r#" + SELECT COUNT(*) > 0 FROM coverage_objects + WHERE inserted_at < $1 AND uuid IN ( + SELECT + DISTINCT uuid + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL + OR service_provider_override IS NULL + ) + "#, + ) + .bind(period_end) + .fetch_one(pool) + .await?) + } + + pub fn fetch_all_hexes( + con: &mut PgConnection, + ) -> impl Stream> + '_ { + sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(con) + } + + pub fn fetch_hexes_with_null_assignments( + con: &mut PgConnection, + ) -> impl Stream> + '_ { + sqlx::query_as( + "SELECT + uuid, hex, signal_level, signal_power + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL + OR service_provider_override IS NULL", + ) + .fetch(con) + } +} diff --git a/dataset_downloader/tests/downloader_test.rs b/dataset_downloader/tests/downloader_test.rs new file mode 100644 index 000000000..7fa6b89cb --- /dev/null +++ b/dataset_downloader/tests/downloader_test.rs @@ -0,0 +1,174 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use aws_local::{gen_bucket_name, AwsLocal, AWSLOCAL_DEFAULT_ENDPOINT}; +use dataset_downloader::db::{fetch_latest_processed_data_set, fetch_latest_unprocessed_data_set}; +use dataset_downloader::{DataSetDownloader, DataSetType, NewDataSetHandler}; +use sqlx::{PgPool, Postgres, Transaction}; +use tempfile::TempDir; + +use hex_assignments::HexBoostData; + +pub async fn create_data_set_downloader( + pool: PgPool, + file_paths: Vec, + tmp_dir: &TempDir, +) -> (DataSetDownloader, HexBoostData, String) { + let bucket_name = gen_bucket_name(); + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + + for file_path in file_paths { + awsl.put_file_to_aws(&file_path).await.unwrap(); + } + + let data_set_directory = tmp_dir.path(); + tokio::fs::create_dir_all(data_set_directory).await.unwrap(); + + let file_store = awsl.file_store.clone(); + + let mut dsd = DataSetDownloader::new(pool, file_store, data_set_directory.to_path_buf()); + + let mut hbd = HexBoostData::default(); + + hbd = dsd.fetch_first_datasets(hbd).await.unwrap(); + hbd = dsd.check_for_new_data_sets(None, hbd).await.unwrap(); + + (dsd, hbd, bucket_name) +} + +pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { + sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS(SELECT 1 FROM hex_assignment_data_set_status WHERE filename = $1) + "#, + ) + .bind(filename) + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test(migrations = "../mobile_verifier/migrations")] +async fn test_dataset_downloader_new_file(pool: PgPool) { + // Scenario: + // 1. DataSetDownloader downloads initial files + // 2. Upload a new file + // 3. DataSetDownloader downloads new file + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/fixtures/{}", f))) + .collect(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let (mut data_set_downloader, data_sets, bucket_name) = + create_data_set_downloader(pool.clone(), file_paths, &tmp_dir).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + awsl.put_file_to_aws(&PathBuf::from_str("./tests/fixtures/footfall.1732895200000.gz").unwrap()) + .await + .unwrap(); + data_set_downloader + .check_for_new_data_sets(None, data_sets) + .await + .unwrap(); + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); +} + +struct TestDatasetHandler {} + +#[async_trait::async_trait] +impl NewDataSetHandler for TestDatasetHandler { + async fn callback( + &self, + _txn: &mut Transaction<'_, Postgres>, + _data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + Err(anyhow::anyhow!("Err")) + } +} + +#[sqlx::test(migrations = "../mobile_verifier/migrations")] +async fn test_dataset_downloader_callback_failed(pool: PgPool) { + // Scenario: + // 1. DataSetDownloader downloads initial files + // 2. Upload a new file + // 3. Callback fails + // 4. Uploaded file should be processed again + // 3. Callback successful, file marked as processed + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/fixtures/{}", f))) + .collect(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let (mut data_set_downloader, data_sets, bucket_name) = + create_data_set_downloader(pool.clone(), file_paths.clone(), &tmp_dir).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); + + let dh = TestDatasetHandler {}; + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + awsl.put_file_to_aws(&PathBuf::from_str("./tests/fixtures/footfall.1732895200000.gz").unwrap()) + .await + .unwrap(); + + assert!(data_set_downloader + .check_for_new_data_sets(Some(&dh), data_sets) + .await + .is_err()); + + let last_processed = fetch_latest_processed_data_set(&pool, DataSetType::Footfall) + .await + .unwrap() + .unwrap(); + let last_unprocessed = fetch_latest_unprocessed_data_set(&pool, DataSetType::Footfall, None) + .await + .unwrap() + .unwrap(); + + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + + assert_eq!(last_processed.filename(), "footfall.1722895200000.gz"); + assert_eq!(last_unprocessed.filename(), "footfall.1732895200000.gz"); + + // initialized datasets and fetches new ones + create_data_set_downloader(pool.clone(), file_paths, &tmp_dir).await; + + let last_processed = fetch_latest_processed_data_set(&pool, DataSetType::Footfall) + .await + .unwrap() + .unwrap(); + assert!( + fetch_latest_unprocessed_data_set(&pool, DataSetType::Footfall, None) + .await + .unwrap() + .is_none() + ); + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + + assert_eq!(last_processed.filename(), "footfall.1732895200000.gz"); +} diff --git a/dataset_downloader/tests/fixtures/footfall.1722895200000.gz b/dataset_downloader/tests/fixtures/footfall.1722895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/dataset_downloader/tests/fixtures/footfall.1722895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/footfall.1732895200000.gz b/dataset_downloader/tests/fixtures/footfall.1732895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/dataset_downloader/tests/fixtures/footfall.1732895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/landtype.1722895200000.gz b/dataset_downloader/tests/fixtures/landtype.1722895200000.gz new file mode 100644 index 000000000..0fb00d726 Binary files /dev/null and b/dataset_downloader/tests/fixtures/landtype.1722895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz b/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz new file mode 100644 index 000000000..c7e8498b3 Binary files /dev/null and b/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz differ diff --git a/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz b/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz new file mode 100644 index 000000000..66f369c81 Binary files /dev/null and b/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz differ diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 74f99de33..21cbf855e 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -17,7 +17,7 @@ tokio = { workspace = true } tokio-util = { workspace = true } tokio-stream = {workspace = true} triggered = {workspace = true} -async-compression = {version = "0", features = ["tokio", "gzip"]} +async-compression = { workspace = true } futures = {workspace = true} futures-util = {workspace = true} prost = {workspace = true} @@ -30,9 +30,9 @@ helium-proto = {workspace = true} helium-crypto = {workspace = true} csv = "*" http = {workspace = true} -aws-config = "0.51" -aws-sdk-s3 = "0.21" -aws-types = { version = "0.51", features = ["hardcoded-credentials"], optional = true} +aws-config = {workspace = true} +aws-sdk-s3 = {workspace = true} +aws-types = {workspace = true, optional = true} strum = {version = "0", features = ["derive"]} strum_macros = "0" sha2 = {workspace = true} diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 4181e5d84..8062bc869 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -62,6 +62,10 @@ poc-metrics = { path = "../metrics" } reward-scheduler = { path = "../reward_scheduler" } solana = { path = "../solana" } task-manager = { path = "../task_manager" } +dataset-downloader = { path = "../dataset_downloader" } [dev-dependencies] proptest = "1.5.0" +aws-local = { path = "../aws_local" } +tempfile = "3" + diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index c7f6200bd..a8a49b53f 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -1,243 +1,37 @@ -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - pin::pin, - time::Duration, -}; +use std::{collections::HashMap, path::PathBuf, pin::pin, time::Duration}; -use chrono::{DateTime, Utc}; +use chrono::Utc; +use dataset_downloader::{ + db, is_hex_boost_data_ready, AssignedHex, DataSetDownloader, NewDataSetHandler, UnassignedHex, +}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{ - FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampDecode, - TimestampEncode, - }, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, FileStore, }; -use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{Stream, TryFutureExt, TryStreamExt}; use helium_proto::services::poc_mobile::{self as proto, OracleBoostingReportV1}; -use hextree::disktree::DiskTreeMap; -use lazy_static::lazy_static; -use regex::Regex; use rust_decimal::prelude::ToPrimitive; use rust_decimal_macros::dec; -use sqlx::{FromRow, PgPool, QueryBuilder}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use task_manager::{ManagedTask, TaskManager}; -use tokio::{fs::File, io::AsyncWriteExt, time::Instant}; - -use crate::{ - coverage::{NewCoverageObjectNotification, SignalLevel}, - Settings, -}; - -use hex_assignments::{ - assignment::HexAssignments, footfall::Footfall, landtype::Landtype, - service_provider_override::ServiceProviderOverride, urbanization::Urbanization, HexAssignment, - HexBoostData, HexBoostDataAssignmentsExt, -}; - -#[async_trait::async_trait] -pub trait DataSet: HexAssignment + Send + Sync + 'static { - const TYPE: DataSetType; - - fn timestamp(&self) -> Option>; +use tokio::time::Instant; - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()>; +use crate::{coverage::NewCoverageObjectNotification, Settings}; - fn is_ready(&self) -> bool; - - async fn fetch_first_data_set( - &mut self, - pool: &PgPool, - data_set_directory: &Path, - ) -> anyhow::Result<()> { - let Some(first_data_set) = db::fetch_latest_processed_data_set(pool, Self::TYPE).await? - else { - return Ok(()); - }; - let path = get_data_set_path(data_set_directory, Self::TYPE, first_data_set.time_to_use); - self.update(Path::new(&path), first_data_set.time_to_use)?; - Ok(()) - } - - async fn check_for_available_data_sets( - &self, - store: &FileStore, - pool: &PgPool, - ) -> anyhow::Result<()> { - tracing::info!("Checking for new {} data sets", Self::TYPE.to_prefix()); - let mut new_data_sets = store.list(Self::TYPE.to_prefix(), self.timestamp(), None); - while let Some(new_data_set) = new_data_sets.next().await.transpose()? { - db::insert_new_data_set(pool, &new_data_set.key, Self::TYPE, new_data_set.timestamp) - .await?; - } - Ok(()) - } - - async fn fetch_next_available_data_set( - &mut self, - store: &FileStore, - pool: &PgPool, - data_set_directory: &Path, - ) -> anyhow::Result> { - self.check_for_available_data_sets(store, pool).await?; - - let latest_unprocessed_data_set = - db::fetch_latest_unprocessed_data_set(pool, Self::TYPE, self.timestamp()).await?; - - let Some(latest_unprocessed_data_set) = latest_unprocessed_data_set else { - return Ok(None); - }; - - let path = get_data_set_path( - data_set_directory, - Self::TYPE, - latest_unprocessed_data_set.time_to_use, - ); - - if !latest_unprocessed_data_set.status.is_downloaded() { - download_data_set(store, &latest_unprocessed_data_set.filename, &path).await?; - latest_unprocessed_data_set.mark_as_downloaded(pool).await?; - tracing::info!( - data_set = latest_unprocessed_data_set.filename, - "Data set download complete" - ); - } - - self.update(Path::new(&path), latest_unprocessed_data_set.time_to_use)?; - - Ok(Some(latest_unprocessed_data_set)) - } -} - -#[async_trait::async_trait] -impl DataSet for Footfall { - const TYPE: DataSetType = DataSetType::Footfall; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.footfall = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.footfall.is_some() - } -} - -#[async_trait::async_trait] -impl DataSet for Landtype { - const TYPE: DataSetType = DataSetType::Landtype; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.landtype = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.landtype.is_some() - } -} - -#[async_trait::async_trait] -impl DataSet for Urbanization { - const TYPE: DataSetType = DataSetType::Urbanization; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.urbanized = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.urbanized.is_some() - } -} - -#[async_trait::async_trait] -impl DataSet for ServiceProviderOverride { - const TYPE: DataSetType = DataSetType::ServiceProviderOverride; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.service_provider_override = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.service_provider_override.is_some() - } -} - -pub fn is_hex_boost_data_ready(h: &HexBoostData) -> bool { - h.urbanization.is_ready() - && h.footfall.is_ready() - && h.landtype.is_ready() - && h.service_provider_override.is_ready() -} +use hex_assignments::{HexBoostData, HexBoostDataAssignmentsExt}; pub struct DataSetDownloaderDaemon { - pool: PgPool, + data_set_downloader: DataSetDownloader, data_sets: HexBoostData, - store: FileStore, - data_set_processor: FileSinkClient, - data_set_directory: PathBuf, + + pool: PgPool, + oracle_boostring_writer: OracleBoostingWriter, new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, } -#[derive(FromRow)] -pub struct NewDataSet { - filename: String, - time_to_use: DateTime, - status: DataSetStatus, -} - -impl NewDataSet { - async fn mark_as_downloaded(&self, pool: &PgPool) -> anyhow::Result<()> { - db::set_data_set_status(pool, &self.filename, DataSetStatus::Downloaded).await?; - Ok(()) - } - - async fn mark_as_processed(&self, pool: &PgPool) -> anyhow::Result<()> { - db::set_data_set_status(pool, &self.filename, DataSetStatus::Processed).await?; - Ok(()) - } -} - -#[derive(Copy, Clone, sqlx::Type)] -#[sqlx(type_name = "data_set_status")] -#[sqlx(rename_all = "lowercase")] -pub enum DataSetStatus { - Pending, - Downloaded, - Processed, -} - -impl DataSetStatus { - pub fn is_downloaded(&self) -> bool { - matches!(self, Self::Downloaded) - } -} - impl ManagedTask for DataSetDownloaderDaemon { fn start_task( self: Box, @@ -294,6 +88,27 @@ impl DataSetDownloaderDaemon { } } +struct OracleBoostingWriter { + data_set_processor: FileSinkClient, +} + +#[async_trait::async_trait] +impl NewDataSetHandler for OracleBoostingWriter { + async fn callback( + &self, + txn: &mut Transaction<'_, Postgres>, + data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + let assigned_coverage_objs = + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(txn), data_sets).await?; + assigned_coverage_objs + .write(&self.data_set_processor) + .await?; + assigned_coverage_objs.save(txn).await?; + Ok(()) + } +} + impl DataSetDownloaderDaemon { pub fn new( pool: PgPool, @@ -304,118 +119,36 @@ impl DataSetDownloaderDaemon { new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, ) -> Self { + let data_set_downloader = DataSetDownloader::new(pool.clone(), store, data_set_directory); + let oracle_boostring_writer = OracleBoostingWriter { data_set_processor }; Self { - pool, - data_sets, - store, - data_set_processor, - data_set_directory, + oracle_boostring_writer, + data_set_downloader, new_coverage_object_notification, poll_duration, + data_sets, + pool, } } - async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { - let new_urbanized = self - .data_sets - .urbanization - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_footfall = self - .data_sets - .footfall - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_landtype = self - .data_sets - .landtype - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_service_provider_override = self - .data_sets - .service_provider_override - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - - // If all of the data sets are ready and there is at least one new one, re-process all - // hex assignments: - let new_data_set = new_urbanized.is_some() - || new_footfall.is_some() - || new_landtype.is_some() - || new_service_provider_override.is_some(); - if is_hex_boost_data_ready(&self.data_sets) && new_data_set { - tracing::info!("Processing new data sets"); - self.data_set_processor - .set_all_oracle_boosting_assignments(&self.pool, &self.data_sets) - .await?; - } - - // Mark the new data sets as processed and delete the old ones - if let Some(new_urbanized) = new_urbanized { - new_urbanized.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Urbanization, - new_urbanized.time_to_use, - ) - .await?; - } - if let Some(new_footfall) = new_footfall { - new_footfall.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Footfall, - new_footfall.time_to_use, - ) - .await?; - } - if let Some(new_landtype) = new_landtype { - new_landtype.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Landtype, - new_landtype.time_to_use, - ) - .await?; - } - if let Some(new_service_provider_override) = new_service_provider_override { - new_service_provider_override - .mark_as_processed(&self.pool) - .await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::ServiceProviderOverride, - new_service_provider_override.time_to_use, - ) - .await?; - } - Ok(()) - } - pub async fn run(mut self) -> anyhow::Result<()> { tracing::info!("Starting data set downloader task"); - self.data_sets - .urbanization - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .footfall - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .landtype - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .service_provider_override - .fetch_first_data_set(&self.pool, &self.data_set_directory) + self.data_sets = self + .data_set_downloader + .fetch_first_datasets(self.data_sets) .await?; + // Attempt to fill in any unassigned hexes. This is for the edge case in // which we shutdown before a coverage object updates. if is_hex_boost_data_ready(&self.data_sets) { - self.data_set_processor - .set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets) + let mut txn = self.pool.begin().await?; + + self.oracle_boostring_writer + .data_set_processor + .set_unassigned_oracle_boosting_assignments(&mut txn, &self.data_sets) .await?; + + txn.commit().await?; } let mut wakeup = Instant::now() + self.poll_duration; @@ -423,17 +156,22 @@ impl DataSetDownloaderDaemon { #[rustfmt::skip] tokio::select! { _ = self.new_coverage_object_notification.await_new_coverage_object() => { + let mut txn = self.pool.begin().await?; + // If we see a new coverage object, we want to assign only those hexes // that don't have an assignment if is_hex_boost_data_ready(&self.data_sets) { - self.data_set_processor.set_unassigned_oracle_boosting_assignments( - &self.pool, + self.oracle_boostring_writer.data_set_processor.set_unassigned_oracle_boosting_assignments( + &mut txn, &self.data_sets, ).await?; } + + txn.commit().await?; + }, _ = tokio::time::sleep_until(wakeup) => { - self.check_for_new_data_sets().await?; + self.data_sets = self.data_set_downloader.check_for_new_data_sets(Some(&self.oracle_boostring_writer), self.data_sets).await?; wakeup = Instant::now() + self.poll_duration; } } @@ -441,305 +179,71 @@ impl DataSetDownloaderDaemon { } } -fn get_data_set_path( - data_set_directory: &Path, - data_set_type: DataSetType, - time_to_use: DateTime, -) -> PathBuf { - let path = PathBuf::from(format!( - "{}.{}.{}.h3tree", - data_set_type.to_prefix(), - time_to_use.timestamp_millis(), - data_set_type.to_hex_res_prefix(), - )); - let mut dir = data_set_directory.to_path_buf(); - dir.push(path); - dir -} - -lazy_static! { - static ref RE: Regex = Regex::new(r"([a-z,_]+).(\d+)(.res[0-9]{1,2}.h3tree)?").unwrap(); -} - -async fn delete_old_data_sets( - data_set_directory: &Path, - data_set_type: DataSetType, - time_to_use: DateTime, -) -> anyhow::Result<()> { - let mut data_sets = tokio::fs::read_dir(data_set_directory).await?; - while let Some(data_set) = data_sets.next_entry().await? { - let file_name = data_set.file_name(); - let file_name = file_name.to_string_lossy(); - let Some(cap) = RE.captures(&file_name) else { - tracing::warn!("Could not determine data set file type: {}", file_name); - continue; - }; - let prefix = &cap[1]; - let timestamp = cap[2].parse::()?.to_timestamp_millis()?; - if prefix == data_set_type.to_prefix() && timestamp < time_to_use { - tracing::info!(data_set = &*file_name, "Deleting old data set file"); - tokio::fs::remove_file(data_set.path()).await?; - } - } - Ok(()) -} - -async fn download_data_set( - store: &FileStore, - in_file_name: &str, - out_path: &Path, -) -> anyhow::Result<()> { - tracing::info!("Downloading new data set: {}", out_path.to_string_lossy()); - let stream = store.get_raw(in_file_name).await?; - let mut bytes = tokio_util::codec::FramedRead::new( - async_compression::tokio::bufread::GzipDecoder::new(tokio_util::io::StreamReader::new( - stream, - )), - tokio_util::codec::BytesCodec::new(), - ); - let mut file = File::create(&out_path).await?; - while let Some(bytes) = bytes.next().await.transpose()? { - file.write_all(&bytes).await?; - } - Ok(()) -} - -#[derive(Copy, Clone, sqlx::Type)] -#[sqlx(type_name = "data_set_type")] -#[sqlx(rename_all = "snake_case")] -pub enum DataSetType { - Urbanization, - Footfall, - Landtype, - ServiceProviderOverride, -} - -impl DataSetType { - pub fn to_prefix(self) -> &'static str { - match self { - Self::Urbanization => "urbanization", - Self::Footfall => "footfall", - Self::Landtype => "landtype", - Self::ServiceProviderOverride => "service_provider_override", - } - } - - pub fn to_hex_res_prefix(self) -> &'static str { - match self { - Self::Urbanization => "res10", - Self::Footfall => "res10", - Self::Landtype => "res10", - Self::ServiceProviderOverride => "res12", - } - } -} - #[async_trait::async_trait] -pub trait DataSetProcessor: Send + Sync + 'static { +pub trait DataSetProcessor<'a>: Send + Sync + 'static { async fn set_all_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()>; async fn set_unassigned_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()>; } #[async_trait::async_trait] -impl DataSetProcessor for FileSinkClient { +impl DataSetProcessor<'_> for FileSinkClient { async fn set_all_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()> { let assigned_coverage_objs = - AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets) - .await?; + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(txn), data_sets).await?; assigned_coverage_objs.write(self).await?; - assigned_coverage_objs.save(pool).await?; + assigned_coverage_objs.save(txn).await?; Ok(()) } async fn set_unassigned_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()> { let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( - db::fetch_hexes_with_null_assignments(pool), + db::fetch_hexes_with_null_assignments(txn), data_sets, ) .await?; assigned_coverage_objs.write(self).await?; - assigned_coverage_objs.save(pool).await?; + assigned_coverage_objs.save(txn).await?; Ok(()) } } -pub struct NopDataSetProcessor; - -#[async_trait::async_trait] -impl DataSetProcessor for NopDataSetProcessor { - async fn set_all_oracle_boosting_assignments( - &self, - _pool: &PgPool, - _data_sets: &impl HexBoostDataAssignmentsExt, - ) -> anyhow::Result<()> { - Ok(()) - } - - async fn set_unassigned_oracle_boosting_assignments( - &self, - _pool: &PgPool, - _data_sets: &impl HexBoostDataAssignmentsExt, - ) -> anyhow::Result<()> { - Ok(()) - } -} - -pub mod db { - use super::*; - - pub async fn fetch_latest_file_date( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result>> { - sqlx::query_scalar("SELECT time_to_use FROM hex_assignment_data_set_status WHERE data_set = $1 ORDER BY time_to_use DESC LIMIT 1") - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - pub async fn insert_new_data_set( - pool: &PgPool, - filename: &str, - data_set_type: DataSetType, - time_to_use: DateTime, - ) -> sqlx::Result<()> { - sqlx::query( - r#" - INSERT INTO hex_assignment_data_set_status (filename, data_set, time_to_use, status) - VALUES ($1, $2, $3, 'pending') - ON CONFLICT DO NOTHING - "#, - ) - .bind(filename) - .bind(data_set_type) - .bind(time_to_use) - .execute(pool) - .await?; - Ok(()) - } - - pub async fn fetch_latest_unprocessed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - since: Option>, - ) -> sqlx::Result> { - sqlx::query_as( - "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status != 'processed' AND data_set = $1 AND COALESCE(time_to_use > $2, TRUE) AND time_to_use <= $3 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .bind(since) - .bind(Utc::now()) - .fetch_optional(pool) - .await - } - - pub async fn fetch_latest_processed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result> { - sqlx::query_as( - "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - pub async fn set_data_set_status( - pool: &PgPool, - filename: &str, - status: DataSetStatus, - ) -> sqlx::Result<()> { - sqlx::query("UPDATE hex_assignment_data_set_status SET status = $1 WHERE filename = $2") - .bind(status) - .bind(filename) - .execute(pool) - .await?; - Ok(()) - } - - pub async fn fetch_time_of_latest_processed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result>> { - sqlx::query_scalar( - "SELECT time_to_use FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - /// Check if there are any pending or downloaded files prior to the given reward period - pub async fn check_for_unprocessed_data_sets( - pool: &PgPool, - period_end: DateTime, - ) -> sqlx::Result { - Ok(sqlx::query_scalar( - "SELECT COUNT(*) > 0 FROM hex_assignment_data_set_status WHERE time_to_use <= $1 AND status != 'processed'", - ) - .bind(period_end) - .fetch_one(pool) - .await? - || sqlx::query_scalar( - r#" - SELECT COUNT(*) > 0 FROM coverage_objects - WHERE inserted_at < $1 AND uuid IN ( - SELECT - DISTINCT uuid - FROM - hexes - WHERE - urbanized IS NULL - OR footfall IS NULL - OR landtype IS NULL - OR service_provider_override IS NULL - ) - "#, - ) - .bind(period_end) - .fetch_one(pool) - .await?) - } - - pub fn fetch_all_hexes(pool: &PgPool) -> impl Stream> + '_ { - sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool) - } - - pub fn fetch_hexes_with_null_assignments( - pool: &PgPool, - ) -> impl Stream> + '_ { - sqlx::query_as( - "SELECT - uuid, hex, signal_level, signal_power - FROM - hexes - WHERE - urbanized IS NULL - OR footfall IS NULL - OR landtype IS NULL - OR service_provider_override IS NULL", - ) - .fetch(pool) - } -} +// pub struct NopDataSetProcessor; +// +// #[async_trait::async_trait] +// impl DataSetProcessor for NopDataSetProcessor { +// async fn set_all_oracle_boosting_assignments( +// &self, +// _pool: &PgPool, +// _data_sets: &impl HexBoostDataAssignmentsExt, +// ) -> anyhow::Result<()> { +// Ok(()) +// } +// +// async fn set_unassigned_oracle_boosting_assignments( +// &self, +// _pool: &PgPool, +// _data_sets: &impl HexBoostDataAssignmentsExt, +// ) -> anyhow::Result<()> { +// Ok(()) +// } +// } pub struct AssignedCoverageObjects { pub coverage_objs: HashMap>, @@ -798,7 +302,7 @@ impl AssignedCoverageObjects { Ok(()) } - pub async fn save(self, pool: &PgPool) -> anyhow::Result<()> { + pub async fn save(self, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { const NUMBER_OF_FIELDS_IN_QUERY: u16 = 8; const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; @@ -828,41 +332,10 @@ impl AssignedCoverageObjects { "#, ) .build() - .execute(pool) + .execute(&mut **txn) .await?; } Ok(()) } } - -#[derive(FromRow)] -pub struct UnassignedHex { - uuid: uuid::Uuid, - #[sqlx(try_from = "i64")] - hex: u64, - signal_level: SignalLevel, - signal_power: i32, -} - -impl UnassignedHex { - fn assign(self, data_sets: &impl HexBoostDataAssignmentsExt) -> anyhow::Result { - let cell = hextree::Cell::try_from(self.hex)?; - - Ok(AssignedHex { - uuid: self.uuid, - hex: self.hex, - signal_level: self.signal_level, - signal_power: self.signal_power, - assignments: data_sets.assignments(cell)?, - }) - } -} - -pub struct AssignedHex { - pub uuid: uuid::Uuid, - pub hex: u64, - pub signal_level: SignalLevel, - pub signal_power: i32, - pub assignments: HexAssignments, -} diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 1a066815d..2a8416f89 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,7 +1,5 @@ use crate::{ - banning, - boosting_oracles::db::check_for_unprocessed_data_sets, - coverage, data_session, + banning, coverage, data_session, heartbeats::{self, HeartbeatReward}, radio_threshold, resolve_subdao_pubkey, reward_shares::{ @@ -15,6 +13,7 @@ use crate::{ }; use anyhow::bail; use chrono::{DateTime, TimeZone, Utc}; +use dataset_downloader::db::check_for_unprocessed_data_sets; use db_store::meta; use file_store::{ file_sink::FileSinkClient, diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index af4213e7b..51edc8df1 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -3,12 +3,15 @@ use anyhow::Context; use chrono::{DateTime, Duration, Utc}; use file_store::{ coverage::RadioHexSignalLevel, + file_upload::{self, FileUpload}, speedtest::CellSpeedtest, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, wifi_heartbeat::{WifiHeartbeat, WifiHeartbeatIngestReport}, }; use futures::stream::{self, StreamExt}; use h3o::CellIndex; use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::OracleBoostingReportV1; use helium_proto::services::poc_mobile::{ CoverageObjectValidity, LocationSource, OracleBoostingHexAssignment, SignalLevel, }; @@ -16,7 +19,11 @@ use hex_assignments::Assignment; use mobile_config::boosted_hex_info::BoostedHexes; use mobile_verifier::{ banning::BannedRadios, - coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache}, + boosting_oracles::DataSetDownloaderDaemon, + coverage::{ + new_coverage_object_notification_channel, CoverageClaimTimeCache, CoverageObject, + CoverageObjectCache, NewCoverageObjectNotification, + }, geofence::GeofenceValidator, heartbeats::{last_location::LocationCache, Heartbeat, HeartbeatReward, ValidatedHeartbeat}, reward_shares::CoverageShares, @@ -30,6 +37,9 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::PgPool; use std::{collections::HashMap, pin::pin}; +use task_manager::{ManagedTask, TaskManager}; +use tempfile::TempDir; +use tokio::task::spawn_local; use uuid::Uuid; #[derive(Clone)] @@ -96,6 +106,146 @@ fn hex_cell(loc: &str) -> hextree::Cell { hextree::Cell::from_raw(u64::from_str_radix(loc, 16).unwrap()).unwrap() } +use aws_local::*; +use hex_assignments::HexBoostData; +use std::{path::PathBuf, str::FromStr}; + +pub async fn create_data_set_downloader( + pool: PgPool, + file_paths: Vec, + file_upload: FileUpload, + new_coverage_object_notification: NewCoverageObjectNotification, + tmp_dir: &TempDir, + bucket_name: String, +) -> impl ManagedTask { + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + + for file_path in file_paths { + awsl.put_file_to_aws(&file_path).await.unwrap(); + } + + let uuid: Uuid = Uuid::new_v4(); + let data_set_directory = tmp_dir.path().join(uuid.to_string()); + tokio::fs::create_dir_all(data_set_directory.clone()) + .await + .unwrap(); + + let file_store = awsl.file_store.clone(); + let poll_duration = std::time::Duration::from_millis(25); + + let (oracle_boosting_reports, oracle_boosting_reports_server) = + OracleBoostingReportV1::file_sink( + tmp_dir.path(), + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(std::time::Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), + ) + .await + .unwrap(); + + let dsdd = DataSetDownloaderDaemon::new( + pool, + HexBoostData::default(), + file_store, + oracle_boosting_reports, + data_set_directory.clone(), + new_coverage_object_notification, + poll_duration, + ); + + TaskManager::builder() + .add_task(oracle_boosting_reports_server) + .add_task(dsdd) + .build() +} + +pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { + sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS(SELECT 1 FROM hex_assignment_data_set_status WHERE filename = $1) + "#, + ) + .bind(filename) + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test] +async fn test_dataset_downloader_daemon(pool: PgPool) { + // Scenario: + // 1. DataSetDownloaderDaemon downloads initial files + // 2. Upload a new file + // 3. DataSetDownloaderDaemon downloads new file + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/integrations/fixtures/{}", f))) + .collect(); + + let (file_upload_tx, _file_upload_rx) = file_upload::message_channel(); + let file_upload = FileUpload { + sender: file_upload_tx, + }; + + let (_, new_coverage_obj_notification) = new_coverage_object_notification_channel(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let bucket_name = gen_bucket_name(); + + let task = create_data_set_downloader( + pool.clone(), + file_paths, + file_upload, + new_coverage_obj_notification, + &tmp_dir, + bucket_name.clone(), + ) + .await; + + let local = tokio::task::LocalSet::new(); + + local + .run_until(async move { + spawn_local(async { + TaskManager::builder() + .add_task(task) + .build() + .start() + .await + .unwrap(); + }); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!( + hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz") + .await + ); + + let awsl = AwsLocal::new(AWSLOCAL_DEFAULT_ENDPOINT, &bucket_name).await; + awsl.put_file_to_aws( + &PathBuf::from_str("./tests/integrations/fixtures/footfall.1732895200000.gz") + .unwrap(), + ) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + }) + .await; +} + #[sqlx::test] async fn test_footfall_and_urbanization_report(pool: PgPool) -> anyhow::Result<()> { let uuid = Uuid::new_v4(); diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 07361afe8..a92114fc3 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -200,8 +200,10 @@ pub async fn set_unassigned_oracle_boosting_assignments( pool: &PgPool, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result> { + let mut tx = pool.begin().await?; + let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( - mobile_verifier::boosting_oracles::data_sets::db::fetch_hexes_with_null_assignments(pool), + dataset_downloader::db::fetch_hexes_with_null_assignments(&mut tx), data_sets, ) .await?; @@ -231,7 +233,9 @@ pub async fn set_unassigned_oracle_boosting_assignments( timestamp, }); } - assigned_coverage_objs.save(pool).await?; + + assigned_coverage_objs.save(&mut tx).await?; + tx.commit().await?; Ok(output) } diff --git a/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/footfall.1722895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz b/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/footfall.1732895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz new file mode 100644 index 000000000..0fb00d726 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/landtype.1722895200000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz b/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz new file mode 100644 index 000000000..c7e8498b3 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/service_provider_override.1739404800000.gz differ diff --git a/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz b/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz new file mode 100644 index 000000000..66f369c81 Binary files /dev/null and b/mobile_verifier/tests/integrations/fixtures/urbanization.1722895200000.gz differ