diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4c8ebea213..6373f2fd61 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -84,6 +84,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true } rusoto_credential = { version = "0.47", optional = true } rusoto_sts = { version = "0.47", default-features = false, optional = true } rusoto_dynamodb = { version = "0.47", default-features = false, optional = true } +rusoto_s3 = { version = "0.47", default-features = false, optional = true } # Glue rusoto_glue = { version = "0.47", default-features = false, optional = true } @@ -157,6 +158,7 @@ s3-native-tls = [ "rusoto_credential", "rusoto_sts/native-tls", "rusoto_dynamodb/native-tls", + "rusoto_s3/native-tls", "dynamodb_lock/native-tls", "object_store/aws", ] @@ -165,6 +167,7 @@ s3 = [ "rusoto_credential", "rusoto_sts/rustls", "rusoto_dynamodb/rustls", + "rusoto_s3/rustls", "dynamodb_lock/rustls", "object_store/aws", ] diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index a11bff8c17..9af745d8c1 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -19,7 +19,7 @@ pub struct IntegrationContext { } impl IntegrationContext { - pub fn new( + pub async fn new( integration: StorageIntegration, ) -> Result> { // environment variables are loaded from .env files if found. Otherwise @@ -46,7 +46,7 @@ impl IntegrationContext { account_path.as_path().to_str().unwrap(), ); } - integration.create_bucket(&bucket)?; + integration.create_bucket(&bucket).await?; let store_uri = match integration { StorageIntegration::Amazon => format!("s3://{}", &bucket), StorageIntegration::Microsoft => format!("az://{}", &bucket), @@ -140,7 +140,7 @@ impl Drop for IntegrationContext { fn drop(&mut self) { match self.integration { StorageIntegration::Amazon => { - s3_cli::delete_bucket(self.root_uri()).unwrap(); + s3_cli::delete_bucket(&self.bucket); s3_cli::delete_lock_table().unwrap(); } StorageIntegration::Microsoft => { @@ -177,14 +177,14 @@ impl StorageIntegration { } } - fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { + async fn create_bucket(&self, name: impl AsRef) -> std::io::Result<()> { match self { Self::Microsoft => { az_cli::create_container(name)?; Ok(()) } Self::Amazon => { - s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; + s3_cli::create_bucket(name.as_ref()).await; set_env_if_not_set( "DYNAMO_LOCK_PARTITION_KEY_VALUE", format!("s3://{}", name.as_ref()), @@ -335,45 +335,94 @@ pub mod az_cli { pub mod s3_cli { use super::set_env_if_not_set; use crate::builder::s3_storage_options; + use rusoto_core::{HttpClient, Region}; + use rusoto_credential::EnvironmentProvider; + use rusoto_s3::{ + CreateBucketRequest, Delete, DeleteBucketRequest, DeleteObjectsRequest, + ListObjectsV2Request, ObjectIdentifier, S3Client, S3, + }; use std::process::{Command, ExitStatus, Stdio}; - /// Create a new bucket - pub fn create_bucket(bucket_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); + pub async fn create_bucket(bucket_name: impl AsRef) { let region = std::env::var(s3_storage_options::AWS_REGION) .expect("variable AWS_REGION must be set to connect to S3"); - let mut child = Command::new("aws") - .args([ - "s3", - "mb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--region", - ®ion, - ]) - .spawn() - .expect("aws command is installed"); - child.wait() + let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) + .expect("variable ENDPOINT must be set to connect to S3"); + + let s3_client = S3Client::new_with( + HttpClient::new().unwrap(), + EnvironmentProvider::default(), + Region::Custom { + name: region, + endpoint: endpoint, + }, + ); + s3_client + .create_bucket(CreateBucketRequest { + bucket: bucket_name.as_ref().to_string(), + ..CreateBucketRequest::default() + }) + .await + .unwrap(); } /// delete bucket - pub fn delete_bucket(bucket_name: impl AsRef) -> std::io::Result { + pub fn delete_bucket(bucket_name: impl AsRef) { + let region = std::env::var(s3_storage_options::AWS_REGION) + .expect("variable AWS_REGION must be set to connect to S3"); let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) .expect("variable ENDPOINT must be set to connect to S3"); - let mut child = Command::new("aws") - .args([ - "s3", - "rb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--force", - ]) - .spawn() - .expect("aws command is installed"); - child.wait() + + let s3_client = S3Client::new_with( + HttpClient::new().unwrap(), + EnvironmentProvider::default(), + Region::Custom { + name: region, + endpoint: endpoint, + }, + ); + + futures::executor::block_on(async { + // objects must be deleted before the bucket can be deleted + let objects: Vec = s3_client + .list_objects_v2(ListObjectsV2Request { + bucket: bucket_name.as_ref().to_string(), + ..ListObjectsV2Request::default() + }) + .await + .unwrap() + .contents + .into_iter() + .flatten() + .filter_map(|x| x.key) + .map(|key| ObjectIdentifier { + key: key, + version_id: None, + }) + .collect(); + + if !objects.is_empty() { + s3_client + .delete_objects(DeleteObjectsRequest { + bucket: bucket_name.as_ref().to_string(), + delete: Delete { + objects: objects, + quiet: Some(true), + }, + ..DeleteObjectsRequest::default() + }) + .await + .unwrap(); + } + + s3_client + .delete_bucket(DeleteBucketRequest { + bucket: bucket_name.as_ref().to_string(), + ..DeleteBucketRequest::default() + }) + .await + .unwrap(); + }); } /// copy directory diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index ced317d990..94d746112e 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -16,7 +16,7 @@ async fn test_filesystem_check_local() -> TestResult { } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn test_filesystem_check_aws() -> TestResult { set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); @@ -46,7 +46,7 @@ async fn test_filesystem_check_hdfs() -> TestResult { } async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context.load_table(TestTables::Simple).await?; let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); @@ -89,7 +89,7 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { #[serial] async fn test_filesystem_check_partitioned() -> TestResult { let storage = StorageIntegration::Local; - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context .load_table(TestTables::Delta0_8_0Partitioned) .await?; @@ -122,7 +122,7 @@ async fn test_filesystem_check_partitioned() -> TestResult { #[serial] async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult { // Validate failure when a non dry only executes on the latest version - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; context.load_table(TestTables::Simple).await?; let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); @@ -150,7 +150,7 @@ async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult { #[ignore = "should this actually fail? with conflcit resolution, we are re-trying again."] async fn test_filesystem_check_outdated() -> TestResult { // Validate failure when a non dry only executes on the latest version - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; context.load_table(TestTables::Simple).await?; let file = "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"; let path = Path::from_iter([&TestTables::Simple.as_name(), file]); diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index c4361ac7bf..fe64f8eb84 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -13,16 +13,16 @@ use tokio::time::sleep; #[tokio::test] async fn cleanup_metadata_fs_test() -> TestResult { - let context = IntegrationContext::new(StorageIntegration::Local)?; + let context = IntegrationContext::new(StorageIntegration::Local).await?; cleanup_metadata_test(&context).await?; Ok(()) } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn cleanup_metadata_aws_test() -> TestResult { - let context = IntegrationContext::new(StorageIntegration::Amazon)?; + let context = IntegrationContext::new(StorageIntegration::Amazon).await?; cleanup_metadata_test(&context).await?; Ok(()) } diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index 314e9ce9a6..a26d6690a8 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -17,7 +17,7 @@ async fn test_concurrent_writes_local() -> TestResult { } #[cfg(feature = "s3")] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn concurrent_writes_s3() -> TestResult { test_concurrent_writes(StorageIntegration::Amazon).await?; Ok(()) @@ -40,7 +40,7 @@ async fn test_concurrent_writes_hdfs() -> TestResult { } async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let (_table, table_uri) = prepare_table(&context).await?; run_test(|name| Worker::new(&table_uri, name)).await; Ok(()) diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 6fd31d759b..7546b05a29 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -23,7 +23,7 @@ async fn test_object_store_azure() -> TestResult { } #[cfg(feature = "s3")] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn test_object_store_aws() -> TestResult { test_object_store(StorageIntegration::Amazon, true).await?; @@ -49,7 +49,7 @@ async fn test_object_store_hdfs() -> TestResult { } async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) .with_allow_http(true) .build_storage()?; @@ -425,7 +425,7 @@ async fn test_object_store_prefixes_local() -> TestResult { } async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(integration)?; + let context = IntegrationContext::new(integration).await?; let prefixes = &["table path", "table path/hello%3F", "你好/😊"]; for prefix in prefixes { let rooturi = format!("{}/{}", context.root_uri(), prefix); diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index b38149b613..13d783c030 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -37,7 +37,7 @@ mod azure { mod local { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_read_tables_local() -> TestResult { read_tables(StorageIntegration::Local).await?; @@ -104,7 +104,7 @@ mod hdfs { #[cfg(any(feature = "s3", feature = "s3-native-tls"))] mod s3 { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_read_tables_aws() -> TestResult { read_tables(StorageIntegration::Amazon).await?; @@ -118,7 +118,7 @@ mod s3 { } async fn read_tables(storage: StorageIntegration) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context.load_table(TestTables::Simple).await?; context.load_table(TestTables::Golden).await?; context @@ -137,7 +137,7 @@ async fn read_table_paths( table_root: &str, upload_path: &str, ) -> TestResult { - let context = IntegrationContext::new(storage)?; + let context = IntegrationContext::new(storage).await?; context .load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path) .await?; diff --git a/rust/tests/repair_s3_rename_test.rs b/rust/tests/repair_s3_rename_test.rs index 25a9a5e060..f55f23baaa 100644 --- a/rust/tests/repair_s3_rename_test.rs +++ b/rust/tests/repair_s3_rename_test.rs @@ -42,7 +42,9 @@ async fn repair_when_worker_pauses_after_rename_test() { async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), ObjectStoreError> { std::env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); std::env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); - let context = IntegrationContext::new(StorageIntegration::Amazon).unwrap(); + let context = IntegrationContext::new(StorageIntegration::Amazon) + .await + .unwrap(); let root_path = Path::from(path); let src1 = root_path.child("src1");