Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DynamicFileCatalog back to core #10745

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 155 additions & 269 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ readme = "README.md"
[dependencies]
arrow = "51.0.0"
async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "38.0.0", features = [
"avro",
Expand Down
6 changes: 4 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
object_storage::{get_object_store, register_options},
print_options::{MaxRows, PrintOptions},
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{
file_format::object_storage::{get_object_store, register_options},
listing::ListingTableUrl,
};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
Expand Down
2 changes: 0 additions & 2 deletions datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
#![doc = include_str!("../README.md")]
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");

pub mod catalog;
pub mod command;
pub mod exec;
pub mod functions;
pub mod helper;
pub mod highlighter;
pub mod object_storage;
pub mod print_format;
pub mod print_options;
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use std::process::ExitCode;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

use datafusion::catalog::dynamic_file_catalog::DynamicFileCatalog;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::{
exec,
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
dirs = "4.0.0"
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
Expand Down Expand Up @@ -158,6 +159,16 @@ tokio-postgres = "0.7.7"
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.29.0", features = ["fs"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
aws-config = "0.101.0"
aws-credential-types = "0.101.0"
# the default features will cause libunwind error when testing on macos paltform.
object_store = { version = "0.9.1", default-features = false, features = ["aws", "gcp", "http"] }
Copy link
Contributor Author

@goldmedal goldmedal Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't know which default feature causes the error on macOS. I just used trial and error many times, and eventually, it worked. I noticed that we disabled the default features in the root Cargo.toml by #8095, but I can't find the reason. Could someone explain this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be problematic -- I don't think we can add these dependencies to the core datafusion crate without massively increasing the dependency footprint.

The idea is that the core of datafusion can be used from many places (including wasm) that don't want the (very large) additional dependency of the aws sdk. Having the dependencies in the DataFusion CLI is fine as it is the standalone app (rather than a library)


[target.'cfg(target_arch = "wasm32")'.dependencies]
# the default features will cause libunwind error when testing on macos paltform.
object_store = { version = "0.9.1", default-features = false }

[[bench]]
harness = false
name = "aggregate_query_sql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,24 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::{Arc, Weak};

use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;

//! Implementation of a dynamic file catalog that automatically creates table providers
//! for the file paths.

use crate::catalog::schema::SchemaProvider;
use crate::catalog::{CatalogProvider, CatalogProviderList};
use crate::datasource::file_format::get_object_store;
#[cfg(not(target_arch = "wasm32"))]
use crate::datasource::file_format::object_storage::{AwsOptions, GcpOptions};
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use async_trait::async_trait;
use datafusion_common::plan_datafusion_err;
use datafusion_common::Result;
use dirs::home_dir;
use parking_lot::RwLock;
use std::any::Any;
use std::sync::{Arc, Weak};

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
Expand All @@ -42,6 +42,7 @@ pub struct DynamicFileCatalog {
}

impl DynamicFileCatalog {
/// Create a new dynamic file catalog
pub fn new(
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
Expand Down Expand Up @@ -176,15 +177,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
Err(_) => {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
}
_ => {}
};
state = add_extension_option(state, scheme);
let store = get_object_store(
&state,
table_url.scheme(),
Expand Down Expand Up @@ -215,6 +208,24 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.table_exist(name)
}
}

#[cfg(not(target_arch = "wasm32"))]
fn add_extension_option(mut state: SessionState, scheme: &str) -> SessionState {
match scheme {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => state = state.add_table_options_extension(GcpOptions::default()),
_ => {}
};
state
}

#[cfg(target_arch = "wasm32")]
fn add_extension_option(state: SessionState, _: &str) -> SessionState {
state
}

fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
Expand All @@ -230,8 +241,8 @@ fn substitute_tilde(cur: String) -> String {
mod tests {
use super::*;

use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;
use crate::catalog::schema::SchemaProvider;
use crate::prelude::SessionContext;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Interfaces and default implementations of catalogs and schemas.

pub mod dynamic_file_catalog;
pub mod information_schema;
pub mod listing_schema;
pub mod schema;
Expand Down
28 changes: 27 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod avro;
pub mod csv;
pub mod file_compression_type;
pub mod json;
#[cfg(not(target_arch = "wasm32"))]
pub mod object_storage;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
Expand All @@ -41,11 +43,13 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::not_impl_err;
use datafusion_common::{not_impl_err, DataFusionError};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
use datafusion_common::config::TableOptions;
use object_store::{ObjectMeta, ObjectStore};
use url::Url;

/// This trait abstracts all the file format specific implementations
/// from the [`TableProvider`]. This helps code re-utilization across
Expand Down Expand Up @@ -106,6 +110,28 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
}
}

/// Get the object store for the given scheme and url. Only available when not targeting wasm32.
#[cfg(not(target_arch = "wasm32"))]
pub async fn get_object_store(
state: &SessionState,
scheme: &str,
url: &Url,
table_options: &TableOptions,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
object_storage::get_object_store(state, scheme, url, table_options).await
}

/// Get the object store for the given scheme and url. Only available when not targeting wasm32.
#[cfg(target_arch = "wasm32")]
pub async fn get_object_store(
state: &SessionState,
scheme: &str,
url: &Url,
table_options: &TableOptions,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
unimplemented!("Object storage is not supported in WASM")
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@
// specific language governing permissions and limitations
// under the License.

//! Implementations for object storage builders for different cloud storage schemes.

use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use datafusion::common::config::{
use crate::common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
};
use datafusion::common::{config_err, exec_datafusion_err, exec_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
use crate::common::{config_err, exec_datafusion_err, exec_err};
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::prelude::SessionContext;

use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::{AmazonS3Builder, AwsCredential};
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::http::HttpBuilder;
use object_store::{CredentialProvider, ObjectStore};
use url::Url;

pub async fn get_s3_object_store_builder(
pub(crate) async fn get_s3_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
Expand All @@ -62,7 +65,7 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_token(session_token);
}
} else {
let config = aws_config::from_env().load().await;
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
if let Some(region) = config.region() {
builder = builder.with_region(region.to_string());
}
Expand Down Expand Up @@ -132,14 +135,14 @@ impl CredentialProvider for S3CredentialProvider {
}
}

pub fn get_oss_object_store_builder(
pub(crate) fn get_oss_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
get_object_store_builder(url, aws_options, true)
}

pub fn get_cos_object_store_builder(
pub(crate) fn get_cos_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
Expand Down Expand Up @@ -173,7 +176,7 @@ fn get_object_store_builder(
Ok(builder)
}

pub fn get_gcs_object_store_builder(
pub(crate) fn get_gcs_object_store_builder(
url: &Url,
gs_options: &GcpOptions,
) -> Result<GoogleCloudStorageBuilder> {
Expand Down Expand Up @@ -416,7 +419,7 @@ impl ConfigExtension for GcpOptions {
/// Google Cloud Storage.
///
/// NOTE: This function will not perform any action when given an unsupported scheme.
pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
pub fn register_options(ctx: &SessionContext, scheme: &str) {
// Match the provided scheme against supported cloud storage schemes:
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
Expand All @@ -434,7 +437,8 @@ pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
}
}

pub(crate) async fn get_object_store(
/// Used to get an object store based on the given scheme and options.
pub async fn get_object_store(
state: &SessionState,
scheme: &str,
url: &Url,
Expand Down Expand Up @@ -500,8 +504,8 @@ pub(crate) async fn get_object_store(
mod tests {
use super::*;

use datafusion::common::plan_err;
use datafusion::{
use crate::common::plan_err;
use crate::{
datasource::listing::ListingTableUrl,
logical_expr::{DdlStatement, LogicalPlan},
prelude::SessionContext,
Expand Down Expand Up @@ -587,7 +591,8 @@ mod tests {
.await
.unwrap_err();

assert_eq!(err.to_string(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true");
// There are other backstraces in the error message, so we just check for containing the message
assert!(err.to_string().contains("Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"));
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
Expand Down