Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 113 additions & 8 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
use datafusion::common::{config_err, exec_datafusion_err, exec_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
Expand All @@ -39,17 +39,26 @@ pub async fn get_s3_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
let AwsOptions {
access_key_id,
secret_access_key,
session_token,
region,
endpoint,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One bug was that endpoint was not read here so I changes this to destructure the object so the compiler can check we don't forget fields

allow_http,
} = aws_options;

let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);

if let (Some(access_key_id), Some(secret_access_key)) =
(&aws_options.access_key_id, &aws_options.secret_access_key)
(access_key_id, secret_access_key)
{
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);

if let Some(session_token) = &aws_options.session_token {
if let Some(session_token) = session_token {
builder = builder.with_token(session_token);
}
} else {
Expand All @@ -72,10 +81,30 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}

if let Some(region) = &aws_options.region {
if let Some(region) = region {
builder = builder.with_region(region);
}

if let Some(endpoint) = endpoint {
// Make a nicer error if the user hasn't allowed http and the endpoint
// is http as the default message is "URL scheme is not allowed"
if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
return config_err!(
"Invalid endpoint: {endpoint}. \
HTTP is not allowed for S3 endpoints. \
To allow HTTP, set 'aws.allow_http' to true"
);
Comment on lines +93 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

}
}

builder = builder.with_endpoint(endpoint);
}

if let Some(allow_http) = allow_http {
builder = builder.with_allow_http(*allow_http);
}

Ok(builder)
}

Expand Down Expand Up @@ -188,6 +217,8 @@ pub struct AwsOptions {
pub region: Option<String>,
/// OSS or COS Endpoint
pub endpoint: Option<String>,
/// Allow HTTP (otherwise will always use https)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I found the reworked configuration option system that @metesynnada added in #9382 really nice to work with ❤️

pub allow_http: Option<bool>,
}

impl ExtensionOptions for AwsOptions {
Expand Down Expand Up @@ -219,11 +250,14 @@ impl ExtensionOptions for AwsOptions {
"region" => {
self.region.set(rem, value)?;
}
"oss" | "cos" => {
"oss" | "cos" | "endpoint" => {
self.endpoint.set(rem, value)?;
}
"allow_http" => {
self.allow_http.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on AwsOptions", rem);
return config_err!("Config value \"{}\" not found on AwsOptions", rem);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by cleanup -- this is not an internal error

}
}
Ok(())
Expand Down Expand Up @@ -262,6 +296,7 @@ impl ExtensionOptions for AwsOptions {
self.session_token.visit(&mut v, "session_token", "");
self.region.visit(&mut v, "region", "");
self.endpoint.visit(&mut v, "endpoint", "");
self.allow_http.visit(&mut v, "allow_http", "");
v.0
}
}
Expand Down Expand Up @@ -307,7 +342,7 @@ impl ExtensionOptions for GcpOptions {
self.application_credentials_path.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on GcpOptions", rem);
return config_err!("Config value \"{}\" not found on GcpOptions", rem);
}
}
Ok(())
Expand Down Expand Up @@ -479,12 +514,21 @@ mod tests {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.region' '{region}', 'aws.session_token' {session_token}) LOCATION '{location}'");
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.region' '{region}', \
'aws.session_token' {session_token}, \
'aws.endpoint' '{endpoint}'\
) LOCATION '{location}'"
);

let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;
Expand All @@ -501,6 +545,7 @@ mod tests {
(AmazonS3ConfigKey::AccessKeyId, access_key_id),
(AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
(AmazonS3ConfigKey::Region, region),
(AmazonS3ConfigKey::Endpoint, endpoint),
(AmazonS3ConfigKey::Token, session_token),
];
for (key, value) in config {
Expand All @@ -513,6 +558,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn s3_object_store_builder_allow_http_error() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let endpoint = "http://endpoint33";
let location = "s3://bucket/path/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.endpoint' '{endpoint}'\
) LOCATION '{location}'"
);

let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
register_options(&ctx, scheme);
let mut table_options = ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
.await
.unwrap_err();

assert_eq!(err.to_string(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test with aws.allow_http enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I added that test below (demonstrate it doesn't error)

} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

// Now add `allow_http` to the options and check if it works
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
('aws.access_key_id' '{access_key_id}', \
'aws.secret_access_key' '{secret_access_key}', \
'aws.endpoint' '{endpoint}',\
'aws.allow_http' 'true'\
) LOCATION '{location}'"
);

let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
register_options(&ctx, scheme);
let mut table_options = ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
// ensure this isn't an error
get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

Ok(())
}

#[tokio::test]
async fn oss_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
Expand Down