-
Notifications
You must be signed in to change notification settings - Fork 35
Add localstack integration and DataSetDownloaderDaemon test #1000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces the aws_local crate for mocking S3 interactions via LocalStack, integrates it into the oracles workspace, and adds an initial dataset downloader with integration tests.
- Adds
aws_localhelper crate and wires it into CI - Implements
DataSetDownloaderDaemonwith separatefetch_first_datasetsandcheck_for_new_data_setsAPIs - Adds integration tests for dataset downloading in
mobile_verifier
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| mobile_verifier/tests/integrations/boosting_oracles.rs | Adds helper functions and integration test for downloader |
| mobile_verifier/src/boosting_oracles/data_sets.rs | Refactors run, exposes check_for_new_data_sets, and adds fetch_first_datasets |
| mobile_verifier/Cargo.toml | Registers aws-local and tempfile |
| file_store/Cargo.toml | Switches AWS crates to workspace references |
| aws_local/src/lib.rs | Implements AwsLocal wrapper over LocalStack S3 |
| aws_local/src/README.md | Adds LocalStack usage note |
| aws_local/Cargo.toml | Declares aws-local package |
| Cargo.toml | Adds aws_local workspace member and patches S3 crates |
| .github/workflows/{DockerCI,CI}.yml | Spins up LocalStack in CI |
Comments suppressed due to low confidence (1)
mobile_verifier/tests/integrations/boosting_oracles.rs:160
- [nitpick] Function name
hex_assignment_file_existis grammatically inconsistent; consider renaming tohex_assignment_file_existsfor clarity.
pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool {
|
|
||
| pub async fn run(mut self) -> anyhow::Result<()> { | ||
| tracing::info!("Starting data set downloader task"); | ||
| self.fetch_first_datasets().await?; |
Copilot
AI
May 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The run method no longer calls check_for_new_data_sets, so it only fetches initial datasets but never polls for new updates. Consider invoking self.check_for_new_data_sets().await? (or a looping mechanism) within run to restore the intended behavior.
| self.fetch_first_datasets().await?; | |
| self.fetch_first_datasets().await?; | |
| self.check_for_new_data_sets().await?; |
| .unwrap(), | ||
| _ => None, | ||
| }; | ||
| let region = Region::new(settings.region.clone()); | ||
| let region_provider = RegionProviderChain::first_try(region).or_default_provider(); | ||
|
|
||
| let mut config = aws_config::from_env().region(region_provider); | ||
| config = config.endpoint_resolver(endpoint.unwrap()); |
Copilot
AI
May 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling endpoint.unwrap() will panic if settings.endpoint is None. Consider handling the Option safely (e.g., using if let Some(ep) = endpoint or returning an error with a clear message).
| .unwrap(), | |
| _ => None, | |
| }; | |
| let region = Region::new(settings.region.clone()); | |
| let region_provider = RegionProviderChain::first_try(region).or_default_provider(); | |
| let mut config = aws_config::from_env().region(region_provider); | |
| config = config.endpoint_resolver(endpoint.unwrap()); | |
| .map_err(|e| anyhow!("Invalid endpoint URI: {}", e))?, | |
| None => return Err(anyhow!("AWS endpoint is not set in the settings")), | |
| }; | |
| let region = Region::new(settings.region.clone()); | |
| let region_provider = RegionProviderChain::first_try(region).or_default_provider(); | |
| let mut config = aws_config::from_env().region(region_provider); | |
| if let Some(endpoint) = endpoint { | |
| config = config.endpoint_resolver(endpoint); | |
| } |
aws_local/src/lib.rs
Outdated
| // So we wait when dir will be empty. | ||
| // It means all files are uploaded to aws | ||
| loop { | ||
| if is_dir_has_files(&dir_path_clone) { |
Copilot
AI
May 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop waits for directory emptiness but never breaks on timeout expiry, risking an infinite loop. Consider checking timeout and breaking or returning an error when elapsed.
| if is_dir_has_files(&dir_path_clone) { | |
| if is_dir_has_files(&dir_path_clone) { | |
| if timeout <= std::time::Duration::ZERO { | |
| eprintln!("Timeout expired while waiting for directory to become empty."); | |
| break; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout -= dur;
Causes panic and breaks loop if timeout happens
| tokio::spawn(async move { | ||
| let uploaded_files = item_recv.await.unwrap().unwrap(); | ||
| assert!(uploaded_files.len() == 1); |
Copilot
AI
May 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assert! inside a spawned task can cause panics that aren’t observed by the main test harness. Consider returning errors through a channel or awaiting the task’s result to propagate failures.
| tokio::spawn(async move { | |
| let uploaded_files = item_recv.await.unwrap().unwrap(); | |
| assert!(uploaded_files.len() == 1); | |
| let (tx, rx) = tokio::sync::oneshot::channel(); | |
| tokio::spawn(async move { | |
| let uploaded_files = item_recv.await.unwrap().unwrap(); | |
| if uploaded_files.len() != 1 { | |
| let _ = tx.send(Err(anyhow!("Expected exactly one uploaded file, but found {}", uploaded_files.len()))); | |
| return; | |
| } |
| pub async fn create_data_set_downloader( | ||
| pool: PgPool, | ||
| file_paths: Vec<PathBuf>, | ||
| file_upload: FileUpload, | ||
| new_coverage_object_notification: NewCoverageObjectNotification, | ||
| tmp_dir: &TempDir, | ||
| ) -> (DataSetDownloaderDaemon, PathBuf, String) { |
Copilot
AI
May 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Returning a tuple (DataSetDownloaderDaemon, PathBuf, String) can be unclear to callers. Consider defining a small struct to name each field for better readability and maintainability.
| pub async fn create_data_set_downloader( | |
| pool: PgPool, | |
| file_paths: Vec<PathBuf>, | |
| file_upload: FileUpload, | |
| new_coverage_object_notification: NewCoverageObjectNotification, | |
| tmp_dir: &TempDir, | |
| ) -> (DataSetDownloaderDaemon, PathBuf, String) { | |
| pub struct DataSetDownloaderResult { | |
| pub downloader: DataSetDownloaderDaemon, | |
| pub data_set_directory: PathBuf, | |
| pub bucket_name: String, | |
| } | |
| pub async fn create_data_set_downloader( | |
| pool: PgPool, | |
| file_paths: Vec<PathBuf>, | |
| file_upload: FileUpload, | |
| new_coverage_object_notification: NewCoverageObjectNotification, | |
| tmp_dir: &TempDir, | |
| ) -> DataSetDownloaderResult { |
aws_local/src/lib.rs
Outdated
| ) -> Result<String> { | ||
| // Uuid uses as random to avoid colisions | ||
| let uuid: Uuid = Uuid::new_v4(); | ||
| let dir_path = format!("/tmp/{}/{}", uuid, self.fs_settings.bucket); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use tempdir here as well?
We don't need to do any cleanup ourselves. All files are removed when the TempDir is dropped.
In the PR we first encountered AwsLocal (https://github.com/novalabsxyz/mobile-rewards-estimator/pull/20), this function was never used. Is it in fact not used in any of your projects and we can drop it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michaeldjeffrey
This function is used at least in the CAS. It serves as the basis for functions that want to put a particular type of proto file.
https://github.com/novalabsxyz/mobile-coverage/blob/master/cas/tests/common/aws_mock.rs#L141
Can we use tempdir here as well?
Yes, changed
AwsLocal is the code I use in 3 different projects to mock proto files in AWS.
So, I think it would be nice to have the core AwsLocal as workspace member in
oraclesrepo to have capability to reuse it.https://github.com/novalabsxyz/mobile-rewards-estimator/pull/20#issuecomment-2695429042