-
Notifications
You must be signed in to change notification settings - Fork 35
Add localstack integration and DataSetDownloaderDaemon test #1000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cfe1f77
e4e9e52
0828767
9732976
cc922b1
6329977
293e453
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| [package] | ||
| name = "aws-local" | ||
| version = "0.1.0" | ||
| authors.workspace = true | ||
| license.workspace = true | ||
| edition.workspace = true | ||
|
|
||
| [dependencies] | ||
| aws-config = {workspace = true} | ||
| aws-sdk-s3 = {workspace = true} | ||
| aws-types = {workspace = true, features = ["hardcoded-credentials"]} | ||
| tokio = {workspace = true} | ||
| triggered = {workspace = true} | ||
| tonic = {workspace = true} | ||
| chrono = {workspace = true} | ||
| prost = {workspace = true} | ||
| anyhow = {workspace = true} | ||
| uuid = {workspace = true} | ||
| tempfile = {workspace = true} | ||
| file-store = { path = "../file_store", features = ["local"] } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| It helps to run tests with [localstack](https://www.localstack.cloud/) |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,166 @@ | ||||||||||||||||||||||
| use anyhow::{anyhow, Result}; | ||||||||||||||||||||||
| use aws_config::meta::region::RegionProviderChain; | ||||||||||||||||||||||
| use aws_sdk_s3::{Client, Endpoint, Region}; | ||||||||||||||||||||||
| use chrono::Utc; | ||||||||||||||||||||||
| use file_store::traits::MsgBytes; | ||||||||||||||||||||||
| use file_store::{file_sink, file_upload, FileStore, FileType, Settings}; | ||||||||||||||||||||||
| use std::path::Path; | ||||||||||||||||||||||
| use std::{str::FromStr, sync::Arc}; | ||||||||||||||||||||||
| use tempfile::TempDir; | ||||||||||||||||||||||
| use tokio::sync::Mutex; | ||||||||||||||||||||||
| use tonic::transport::Uri; | ||||||||||||||||||||||
| use uuid::Uuid; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566"; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pub fn gen_bucket_name() -> String { | ||||||||||||||||||||||
| format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis()) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Interacts with the locastack. | ||||||||||||||||||||||
| // Used to create mocked aws buckets and files. | ||||||||||||||||||||||
| pub struct AwsLocal { | ||||||||||||||||||||||
| pub fs_settings: Settings, | ||||||||||||||||||||||
| pub file_store: FileStore, | ||||||||||||||||||||||
| pub aws_client: aws_sdk_s3::Client, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl AwsLocal { | ||||||||||||||||||||||
| async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client { | ||||||||||||||||||||||
| let endpoint: Option<Endpoint> = match &settings.endpoint { | ||||||||||||||||||||||
| Some(endpoint) => Uri::from_str(endpoint) | ||||||||||||||||||||||
| .map(Endpoint::immutable) | ||||||||||||||||||||||
| .map(Some) | ||||||||||||||||||||||
| .unwrap(), | ||||||||||||||||||||||
| _ => None, | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| let region = Region::new(settings.region.clone()); | ||||||||||||||||||||||
| let region_provider = RegionProviderChain::first_try(region).or_default_provider(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let mut config = aws_config::from_env().region(region_provider); | ||||||||||||||||||||||
| config = config.endpoint_resolver(endpoint.unwrap()); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let creds = aws_types::credentials::Credentials::from_keys( | ||||||||||||||||||||||
| settings.access_key_id.as_ref().unwrap(), | ||||||||||||||||||||||
| settings.secret_access_key.as_ref().unwrap(), | ||||||||||||||||||||||
| None, | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| config = config.credentials_provider(creds); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let config = config.load().await; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Client::new(&config) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal { | ||||||||||||||||||||||
| let settings = Settings { | ||||||||||||||||||||||
| bucket: bucket.into(), | ||||||||||||||||||||||
| endpoint: Some(endpoint.into()), | ||||||||||||||||||||||
| region: "us-east-1".into(), | ||||||||||||||||||||||
| access_key_id: Some("random".into()), | ||||||||||||||||||||||
| secret_access_key: Some("random2".into()), | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| let client = Self::create_aws_client(&settings).await; | ||||||||||||||||||||||
| client.create_bucket().bucket(bucket).send().await.unwrap(); | ||||||||||||||||||||||
| AwsLocal { | ||||||||||||||||||||||
| aws_client: client, | ||||||||||||||||||||||
| fs_settings: settings.clone(), | ||||||||||||||||||||||
| file_store: file_store::FileStore::from_settings(&settings) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .unwrap(), | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pub fn fs_settings(&self) -> Settings { | ||||||||||||||||||||||
| self.fs_settings.clone() | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pub async fn put_proto_to_aws<T: prost::Message + MsgBytes>( | ||||||||||||||||||||||
| &self, | ||||||||||||||||||||||
| items: Vec<T>, | ||||||||||||||||||||||
| file_type: FileType, | ||||||||||||||||||||||
| metric_name: &'static str, | ||||||||||||||||||||||
| ) -> Result<String> { | ||||||||||||||||||||||
| let tmp_dir = TempDir::new()?; | ||||||||||||||||||||||
| let tmp_dir_path = tmp_dir.path().to_owned(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let (shutdown_trigger, shutdown_listener) = triggered::trigger(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let (file_upload, file_upload_server) = | ||||||||||||||||||||||
| file_upload::FileUpload::from_settings_tm(&self.fs_settings) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .unwrap(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let (item_sink, item_server) = | ||||||||||||||||||||||
| file_sink::FileSinkBuilder::new(file_type, &tmp_dir_path, file_upload, metric_name) | ||||||||||||||||||||||
| .auto_commit(false) | ||||||||||||||||||||||
| .roll_time(std::time::Duration::new(15, 0)) | ||||||||||||||||||||||
| .create::<T>() | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .unwrap(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for item in items { | ||||||||||||||||||||||
| item_sink.write(item, &[]).await.unwrap(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| let item_recv = item_sink.commit().await.unwrap(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let uploaded_file = Arc::new(Mutex::new(String::default())); | ||||||||||||||||||||||
| let up_2 = uploaded_file.clone(); | ||||||||||||||||||||||
| let mut timeout = std::time::Duration::new(5, 0); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||||||
| let uploaded_files = item_recv.await.unwrap().unwrap(); | ||||||||||||||||||||||
| assert!(uploaded_files.len() == 1); | ||||||||||||||||||||||
|
Comment on lines
+111
to
+113
|
||||||||||||||||||||||
| tokio::spawn(async move { | |
| let uploaded_files = item_recv.await.unwrap().unwrap(); | |
| assert!(uploaded_files.len() == 1); | |
| let (tx, rx) = tokio::sync::oneshot::channel(); | |
| tokio::spawn(async move { | |
| let uploaded_files = item_recv.await.unwrap().unwrap(); | |
| if uploaded_files.len() != 1 { | |
| let _ = tx.send(Err(anyhow!("Expected exactly one uploaded file, but found {}", uploaded_files.len()))); | |
| return; | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -315,7 +315,7 @@ impl DataSetDownloaderDaemon { | |||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { | ||||||||
| pub async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { | ||||||||
| let new_urbanized = self | ||||||||
| .data_sets | ||||||||
| .urbanization | ||||||||
|
|
@@ -392,8 +392,7 @@ impl DataSetDownloaderDaemon { | |||||||
| Ok(()) | ||||||||
| } | ||||||||
|
|
||||||||
| pub async fn run(mut self) -> anyhow::Result<()> { | ||||||||
| tracing::info!("Starting data set downloader task"); | ||||||||
| pub async fn fetch_first_datasets(&mut self) -> anyhow::Result<()> { | ||||||||
| self.data_sets | ||||||||
| .urbanization | ||||||||
| .fetch_first_data_set(&self.pool, &self.data_set_directory) | ||||||||
|
|
@@ -410,6 +409,12 @@ impl DataSetDownloaderDaemon { | |||||||
| .service_provider_override | ||||||||
| .fetch_first_data_set(&self.pool, &self.data_set_directory) | ||||||||
| .await?; | ||||||||
| Ok(()) | ||||||||
| } | ||||||||
|
|
||||||||
| pub async fn run(mut self) -> anyhow::Result<()> { | ||||||||
| tracing::info!("Starting data set downloader task"); | ||||||||
| self.fetch_first_datasets().await?; | ||||||||
|
||||||||
| self.fetch_first_datasets().await?; | |
| self.fetch_first_datasets().await?; | |
| self.check_for_new_data_sets().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling
endpoint.unwrap()will panic ifsettings.endpointisNone. Consider handling theOptionsafely (e.g., usingif let Some(ep) = endpointor returning an error with a clear message).