Skip to content

Commit

Permalink
Custom / Dynamic table provider factories (#3311)
Browse files Browse the repository at this point in the history
* Custom table provider factories

* Rebase
  • Loading branch information
Brent Gardner committed Sep 5, 2022
1 parent b175f9a commit bb08d31
Show file tree
Hide file tree
Showing 18 changed files with 292 additions and 207 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ jobs:
rustup default stable
rustup component add rustfmt
- name: Run
run: ci/scripts/rust_fmt.sh
run: |
echo '' > datafusion/proto/src/generated/datafusion.rs
ci/scripts/rust_fmt.sh
coverage:
name: coverage
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
async-trait = "0.1.53"
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ pub trait TableProvider: Sync + Send {
Ok(TableProviderFilterPushDown::Unsupported)
}
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
pub trait TableProviderFactory: Sync + Send {
/// Create a TableProvider given name and url
fn create(&self, name: &str, url: &str) -> Arc<dyn TableProvider>;
}
162 changes: 98 additions & 64 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
Expand All @@ -90,6 +90,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::datasource::datasource::TableProviderFactory;
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -175,6 +176,8 @@ pub struct SessionContext {
pub session_start_time: DateTime<Utc>,
/// Shared session state for the session
pub state: Arc<RwLock<SessionState>>,
/// Dynamic table providers
pub table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
}

impl Default for SessionContext {
Expand Down Expand Up @@ -202,6 +205,7 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

Expand All @@ -211,9 +215,19 @@ impl SessionContext {
session_id: state.session_id.clone(),
session_start_time: chrono::Utc::now(),
state: Arc::new(RwLock::new(state)),
table_factories: HashMap::default(),
}
}

/// Register a `TableProviderFactory` for a given `file_type` identifier
pub fn register_table_factory(
&mut self,
file_type: &str,
factory: Arc<dyn TableProviderFactory>,
) {
self.table_factories.insert(file_type.to_string(), factory);
}

/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.state.read().runtime_env.clone()
Expand All @@ -236,70 +250,12 @@ impl SessionContext {
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Arc::new(
CsvFormat::default()
.with_has_header(*has_header)
.with_delimiter(*delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
FileType::Parquet => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
FileType::Avro => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
FileType::NdJson => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
};
let table = self.table(name.as_str());
match (if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if schema.fields().is_empty() {
None
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: table_partition_cols.clone(),
};
self.register_listing_table(
name,
location,
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
"PARQUET" | "CSV" | "JSON" | "AVRO" => {
self.create_listing_table(&cmd).await
}
}
_ => self.create_custom_table(&cmd).await,
},

LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
Expand Down Expand Up @@ -480,6 +436,84 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| {
DataFusionError::Execution(format!(
"Unable to find factory for {}",
cmd.file_type
))
})?;
let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str());
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}

async fn create_listing_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let (file_format, file_extension) = match cmd.file_type.as_str() {
"CSV" => (
Arc::new(
CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8),
) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
"PARQUET" => (
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
),
"AVRO" => (
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
),
"JSON" => (
Arc::new(JsonFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_JSON_EXTENSION,
),
_ => Err(DataFusionError::Execution(
"Only known FileTypes can be ListingTables!".to_string(),
))?,
};
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
// TODO make schema in CreateExternalTable optional instead of empty
let provided_schema = if cmd.schema.fields().is_empty() {
None
} else {
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
};
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: cmd.table_partition_cols.clone(),
};
self.register_listing_table(
cmd.name.as_str(),
cmd.location.clone(),
options,
provided_schema,
)
.await?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
cmd.name
))),
}
}

fn find_and_deregister<'a>(
&self,
table_ref: impl Into<TableReference<'a>>,
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ pub use datafusion_expr::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
CreateView, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use datafusion_expr::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
Expand Down
75 changes: 75 additions & 0 deletions datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use std::any::Any;
use std::io::Write;

use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::context::SessionState;
use datafusion_expr::TableType;
use tempfile::TempDir;

use super::*;
Expand Down Expand Up @@ -360,6 +365,76 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
Ok(())
}

struct TestTableProvider {}

impl TestTableProvider {}

#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
unimplemented!("TestTableProvider is a stub for testing.")
}

fn schema(&self) -> SchemaRef {
unimplemented!("TestTableProvider is a stub for testing.")
}

fn table_type(&self) -> TableType {
unimplemented!("TestTableProvider is a stub for testing.")
}

async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("TestTableProvider is a stub for testing.")
}
}

struct TestTableFactory {}

impl TableProviderFactory for TestTableFactory {
fn create(&self, _name: &str, _path: &str) -> Arc<dyn TableProvider> {
Arc::new(TestTableProvider {})
}
}

#[tokio::test]
async fn create_custom_table() -> Result<()> {
let mut ctx = SessionContext::new();
ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {}));

let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
ctx.sql(sql).await.unwrap();

let cat = ctx.catalog("datafusion").unwrap();
let schema = cat.schema("public").unwrap();
let exists = schema.table_exist("dt");
assert!(exists, "Table should have been created!");

Ok(())
}

#[tokio::test]
async fn create_bad_custom_table() {
let ctx = SessionContext::new();

let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
let res = ctx.sql(sql).await;
match res {
Ok(_) => panic!("Registration of tables without factories should fail"),
Err(e) => {
assert!(
e.to_string().contains("Unable to find factory for"),
"Registration of tables without factories should throw correct error"
)
}
}
}

#[tokio::test]
async fn create_csv_table_empty_file() -> Result<()> {
let ctx =
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ async fn timestamp_sub_interval_days() -> Result<()> {
}

#[tokio::test]
#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327
async fn timestamp_add_interval_months() -> Result<()> {
let ctx = SessionContext::new();

Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType,
Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values, Window,
};

Expand Down

0 comments on commit bb08d31

Please sign in to comment.