Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove s3_cli from test_utils.rs #1570

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
rusoto_s3 = { version = "0.47", default-features = false, optional = true }

# Glue
rusoto_glue = { version = "0.47", default-features = false, optional = true }
Expand Down Expand Up @@ -157,6 +158,7 @@ s3-native-tls = [
"rusoto_credential",
"rusoto_sts/native-tls",
"rusoto_dynamodb/native-tls",
"rusoto_s3/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
]
Expand All @@ -165,6 +167,7 @@ s3 = [
"rusoto_credential",
"rusoto_sts/rustls",
"rusoto_dynamodb/rustls",
"rusoto_s3/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
]
Expand Down
119 changes: 84 additions & 35 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct IntegrationContext {
}

impl IntegrationContext {
pub fn new(
pub async fn new(
integration: StorageIntegration,
) -> Result<Self, Box<dyn std::error::Error + 'static>> {
// environment variables are loaded from .env files if found. Otherwise
Expand All @@ -46,7 +46,7 @@ impl IntegrationContext {
account_path.as_path().to_str().unwrap(),
);
}
integration.create_bucket(&bucket)?;
integration.create_bucket(&bucket).await?;
let store_uri = match integration {
StorageIntegration::Amazon => format!("s3://{}", &bucket),
StorageIntegration::Microsoft => format!("az://{}", &bucket),
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Drop for IntegrationContext {
fn drop(&mut self) {
match self.integration {
StorageIntegration::Amazon => {
s3_cli::delete_bucket(self.root_uri()).unwrap();
s3_cli::delete_bucket(&self.bucket);
s3_cli::delete_lock_table().unwrap();
}
StorageIntegration::Microsoft => {
Expand Down Expand Up @@ -177,14 +177,14 @@ impl StorageIntegration {
}
}

fn create_bucket(&self, name: impl AsRef<str>) -> std::io::Result<()> {
async fn create_bucket(&self, name: impl AsRef<str>) -> std::io::Result<()> {
match self {
Self::Microsoft => {
az_cli::create_container(name)?;
Ok(())
}
Self::Amazon => {
s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?;
s3_cli::create_bucket(name.as_ref()).await;
set_env_if_not_set(
"DYNAMO_LOCK_PARTITION_KEY_VALUE",
format!("s3://{}", name.as_ref()),
Expand Down Expand Up @@ -335,45 +335,94 @@ pub mod az_cli {
pub mod s3_cli {
use super::set_env_if_not_set;
use crate::builder::s3_storage_options;
use rusoto_core::{HttpClient, Region};
use rusoto_credential::EnvironmentProvider;
use rusoto_s3::{
CreateBucketRequest, Delete, DeleteBucketRequest, DeleteObjectsRequest,
ListObjectsV2Request, ObjectIdentifier, S3Client, S3,
};
use std::process::{Command, ExitStatus, Stdio};

/// Create a new bucket
pub fn create_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
pub async fn create_bucket(bucket_name: impl AsRef<str>) {
let region = std::env::var(s3_storage_options::AWS_REGION)
.expect("variable AWS_REGION must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"mb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--region",
&region,
])
.spawn()
.expect("aws command is installed");
child.wait()
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");

let s3_client = S3Client::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
Region::Custom {
name: region,
endpoint: endpoint,
},
);
s3_client
.create_bucket(CreateBucketRequest {
bucket: bucket_name.as_ref().to_string(),
..CreateBucketRequest::default()
})
.await
.unwrap();
}

/// delete bucket
pub fn delete_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
pub fn delete_bucket(bucket_name: impl AsRef<str>) {
let region = std::env::var(s3_storage_options::AWS_REGION)
.expect("variable AWS_REGION must be set to connect to S3");
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"rb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--force",
])
.spawn()
.expect("aws command is installed");
child.wait()

let s3_client = S3Client::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
Region::Custom {
name: region,
endpoint: endpoint,
},
);

futures::executor::block_on(async {
// objects must be deleted before the bucket can be deleted
let objects: Vec<ObjectIdentifier> = s3_client
.list_objects_v2(ListObjectsV2Request {
bucket: bucket_name.as_ref().to_string(),
..ListObjectsV2Request::default()
})
.await
.unwrap()
.contents
.into_iter()
.flatten()
.filter_map(|x| x.key)
.map(|key| ObjectIdentifier {
key: key,
version_id: None,
})
.collect();

if !objects.is_empty() {
s3_client
.delete_objects(DeleteObjectsRequest {
bucket: bucket_name.as_ref().to_string(),
delete: Delete {
objects: objects,
quiet: Some(true),
},
..DeleteObjectsRequest::default()
})
.await
.unwrap();
}

s3_client
.delete_bucket(DeleteBucketRequest {
bucket: bucket_name.as_ref().to_string(),
..DeleteBucketRequest::default()
})
.await
.unwrap();
});
}

/// copy directory
Expand Down
10 changes: 5 additions & 5 deletions rust/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn test_filesystem_check_local() -> TestResult {
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_filesystem_check_aws() -> TestResult {
set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn test_filesystem_check_hdfs() -> TestResult {
}

async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
let context = IntegrationContext::new(storage).await?;
context.load_table(TestTables::Simple).await?;
let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
#[serial]
async fn test_filesystem_check_partitioned() -> TestResult {
let storage = StorageIntegration::Local;
let context = IntegrationContext::new(storage)?;
let context = IntegrationContext::new(storage).await?;
context
.load_table(TestTables::Delta0_8_0Partitioned)
.await?;
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn test_filesystem_check_partitioned() -> TestResult {
#[serial]
async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult {
// Validate failure when a non dry only executes on the latest version
let context = IntegrationContext::new(StorageIntegration::Local)?;
let context = IntegrationContext::new(StorageIntegration::Local).await?;
context.load_table(TestTables::Simple).await?;
let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);
Expand Down Expand Up @@ -150,7 +150,7 @@ async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult {
#[ignore = "should this actually fail? with conflcit resolution, we are re-trying again."]
async fn test_filesystem_check_outdated() -> TestResult {
// Validate failure when a non dry only executes on the latest version
let context = IntegrationContext::new(StorageIntegration::Local)?;
let context = IntegrationContext::new(StorageIntegration::Local).await?;
context.load_table(TestTables::Simple).await?;
let file = "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);
Expand Down
6 changes: 3 additions & 3 deletions rust/tests/integration_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ use tokio::time::sleep;

#[tokio::test]
async fn cleanup_metadata_fs_test() -> TestResult {
let context = IntegrationContext::new(StorageIntegration::Local)?;
let context = IntegrationContext::new(StorageIntegration::Local).await?;
cleanup_metadata_test(&context).await?;
Ok(())
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn cleanup_metadata_aws_test() -> TestResult {
let context = IntegrationContext::new(StorageIntegration::Amazon)?;
let context = IntegrationContext::new(StorageIntegration::Amazon).await?;
cleanup_metadata_test(&context).await?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/integration_concurrent_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn test_concurrent_writes_local() -> TestResult {
}

#[cfg(feature = "s3")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_writes_s3() -> TestResult {
test_concurrent_writes(StorageIntegration::Amazon).await?;
Ok(())
Expand All @@ -40,7 +40,7 @@ async fn test_concurrent_writes_hdfs() -> TestResult {
}

async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(integration)?;
let context = IntegrationContext::new(integration).await?;
let (_table, table_uri) = prepare_table(&context).await?;
run_test(|name| Worker::new(&table_uri, name)).await;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions rust/tests/integration_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn test_object_store_azure() -> TestResult {
}

#[cfg(feature = "s3")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_object_store_aws() -> TestResult {
test_object_store(StorageIntegration::Amazon, true).await?;
Expand All @@ -49,7 +49,7 @@ async fn test_object_store_hdfs() -> TestResult {
}

async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult {
let context = IntegrationContext::new(integration)?;
let context = IntegrationContext::new(integration).await?;
let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
.with_allow_http(true)
.build_storage()?;
Expand Down Expand Up @@ -425,7 +425,7 @@ async fn test_object_store_prefixes_local() -> TestResult {
}

async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(integration)?;
let context = IntegrationContext::new(integration).await?;
let prefixes = &["table path", "table path/hello%3F", "你好/😊"];
for prefix in prefixes {
let rooturi = format!("{}/{}", context.root_uri(), prefix);
Expand Down
8 changes: 4 additions & 4 deletions rust/tests/integration_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod azure {
mod local {
use super::*;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_read_tables_local() -> TestResult {
read_tables(StorageIntegration::Local).await?;
Expand Down Expand Up @@ -104,7 +104,7 @@ mod hdfs {
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
mod s3 {
use super::*;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_read_tables_aws() -> TestResult {
read_tables(StorageIntegration::Amazon).await?;
Expand All @@ -118,7 +118,7 @@ mod s3 {
}

async fn read_tables(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
let context = IntegrationContext::new(storage).await?;
context.load_table(TestTables::Simple).await?;
context.load_table(TestTables::Golden).await?;
context
Expand All @@ -137,7 +137,7 @@ async fn read_table_paths(
table_root: &str,
upload_path: &str,
) -> TestResult {
let context = IntegrationContext::new(storage)?;
let context = IntegrationContext::new(storage).await?;
context
.load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path)
.await?;
Expand Down
4 changes: 3 additions & 1 deletion rust/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ async fn repair_when_worker_pauses_after_rename_test() {
async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), ObjectStoreError> {
std::env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb");
std::env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2");
let context = IntegrationContext::new(StorageIntegration::Amazon).unwrap();
let context = IntegrationContext::new(StorageIntegration::Amazon)
.await
.unwrap();

let root_path = Path::from(path);
let src1 = root_path.child("src1");
Expand Down