From 9c8237284cad97a3ccf0e6d161440904d8c89ba6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 Jun 2024 15:21:39 -0400 Subject: [PATCH 1/4] Split `SessionState` into its own module --- datafusion/core/src/execution/context/mod.rs | 1109 +---------------- datafusion/core/src/execution/mod.rs | 2 + datafusion/core/src/execution/state.rs | 1130 ++++++++++++++++++ 3 files changed, 1177 insertions(+), 1064 deletions(-) create mode 100644 datafusion/core/src/execution/state.rs diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 745eff550fae..acf65d4c5332 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -17,30 +17,20 @@ //! [`SessionContext`] API for registering data sources and executing queries -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::collections::HashSet; use std::fmt::Debug; -use std::ops::ControlFlow; use std::sync::{Arc, Weak}; use super::options::ReadOptions; -#[cfg(feature = "array_expressions")] -use crate::functions_array; use crate::{ - catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}, catalog::listing_schema::ListingSchemaProvider, - catalog::schema::{MemorySchemaProvider, SchemaProvider}, - catalog::{ - CatalogProvider, CatalogProviderList, MemoryCatalogProvider, - MemoryCatalogProviderList, - }, - config::ConfigOptions, + catalog::schema::MemorySchemaProvider, + catalog::{CatalogProvider, CatalogProviderList, MemoryCatalogProvider}, dataframe::DataFrame, datasource::{ - cte_worktable::CteWorkTable, - function::{TableFunction, TableFunctionImpl}, + function::TableFunctionImpl, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, - object_store::ObjectStoreUrl, - provider::{DefaultTableFactory, TableProviderFactory}, + provider::TableProviderFactory, }, datasource::{provider_as_source, MemTable, TableProvider, ViewTable}, error::{DataFusionError, Result}, @@ -50,51 +40,37 @@ use crate::{ logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, - DropView, Explain, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable, - TableSource, TableType, ToStringifiedPlan, UNNAMED_TABLE, + DropView, LogicalPlan, LogicalPlanBuilder, SetVariable, TableType, UNNAMED_TABLE, }, - optimizer::analyzer::{Analyzer, AnalyzerRule}, - optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule}, - physical_expr::{create_physical_expr, PhysicalExpr}, - physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule}, + physical_expr::PhysicalExpr, physical_plan::ExecutionPlan, - physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, variable::{VarProvider, VarType}, }; -use crate::{functions, functions_aggregate}; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use datafusion_common::{ - alias::AliasGenerator, config::{ConfigExtension, TableOptions}, - exec_err, not_impl_err, plan_datafusion_err, plan_err, - tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}, + exec_err, not_impl_err, plan_err, + tree_node::{TreeNodeRecursion, TreeNodeVisitor}, DFSchema, SchemaReference, TableReference, }; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ expr_rewriter::FunctionRewrite, logical_plan::{DdlStatement, Statement}, - simplify::SimplifyInfo, - var_provider::is_system_variables, - Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, -}; -use datafusion_optimizer::simplify_expressions::ExprSimplifier; -use datafusion_sql::{ - parser::{CopyToSource, CopyToStatement, DFParser}, - planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel}, - ResolvedTableReference, + Expr, UserDefinedLogicalNode, WindowUDF, }; -use sqlparser::dialect::dialect_from_str; + +// backwards compatibility +pub use crate::execution::state::SessionState; use async_trait::async_trait; use chrono::{DateTime, Utc}; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; -use uuid::Uuid; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; @@ -330,7 +306,7 @@ impl SessionContext { /// Creates a new `SessionContext` using the provided [`SessionState`] pub fn new_with_state(state: SessionState) -> Self { Self { - session_id: state.session_id.clone(), + session_id: state.session_id().to_string(), session_start_time: Utc::now(), state: Arc::new(RwLock::new(state)), } @@ -395,7 +371,7 @@ impl SessionContext { /// Return the [RuntimeEnv] used to run queries with this `SessionContext` pub fn runtime_env(&self) -> Arc { - self.state.read().runtime_env.clone() + self.state.read().runtime_env().clone() } /// Returns an id that uniquely identifies this `SessionContext`. @@ -416,7 +392,7 @@ impl SessionContext { pub fn enable_ident_normalization(&self) -> bool { self.state .read() - .config + .config() .options() .sql_parser .enable_ident_normalization @@ -424,7 +400,7 @@ impl SessionContext { /// Return a copied version of config for this Session pub fn copied_config(&self) -> SessionConfig { - self.state.read().config.clone() + self.state.read().config().clone() } /// Return a copied version of table options for this Session @@ -705,8 +681,8 @@ impl SessionContext { let (catalog, schema_name) = match tokens.len() { 1 => { let state = self.state.read(); - let name = &state.config.options().catalog.default_catalog; - let catalog = state.catalog_list.catalog(name).ok_or_else(|| { + let name = &state.config().options().catalog.default_catalog; + let catalog = state.catalog_list().catalog(name).ok_or_else(|| { DataFusionError::Execution(format!( "Missing default catalog '{name}'" )) @@ -749,7 +725,7 @@ impl SessionContext { let new_catalog = Arc::new(MemoryCatalogProvider::new()); self.state .write() - .catalog_list + .catalog_list() .register_catalog(catalog_name, new_catalog); self.return_empty_dataframe() } @@ -800,7 +776,7 @@ impl SessionContext { state.config_options().catalog.default_catalog.to_string() } }; - if let Some(catalog) = state.catalog_list.catalog(&catalog_name) { + if let Some(catalog) = state.catalog_list().catalog(&catalog_name) { catalog } else if allow_missing { return self.return_empty_dataframe(); @@ -826,7 +802,7 @@ impl SessionContext { } = stmt; let mut state = self.state.write(); - state.config.options_mut().set(&variable, &value)?; + state.config_mut().options_mut().set(&variable, &value)?; drop(state); self.return_empty_dataframe() @@ -839,8 +815,8 @@ impl SessionContext { let state = self.state.read().clone(); let file_type = cmd.file_type.to_uppercase(); let factory = - &state - .table_factories + state + .table_factories() .get(file_type.as_str()) .ok_or_else(|| { DataFusionError::Execution(format!( @@ -863,7 +839,7 @@ impl SessionContext { let state = self.state.read(); let resolved = state.resolve_table_ref(table_ref); state - .catalog_list + .catalog_list() .catalog(&resolved.catalog) .and_then(|c| c.schema(&resolved.schema)) }; @@ -883,7 +859,7 @@ impl SessionContext { async fn create_function(&self, stmt: CreateFunction) -> Result { let function = { let state = self.state.read().clone(); - let function_factory = &state.function_factory; + let function_factory = state.function_factory(); match function_factory { Some(f) => f.create(&state, stmt).await?, @@ -937,16 +913,13 @@ impl SessionContext { ) { self.state .write() - .execution_props + .execution_props_mut() .add_var_provider(variable_type, provider); } /// Register a table UDF with this context pub fn register_udtf(&self, name: &str, fun: Arc) { - self.state.write().table_functions.insert( - name.to_owned(), - Arc::new(TableFunction::new(name.to_owned(), fun)), - ); + self.state.write().register_udtf(name, fun) } /// Registers a scalar UDF within this context. @@ -1176,18 +1149,18 @@ impl SessionContext { let name = name.into(); self.state .read() - .catalog_list + .catalog_list() .register_catalog(name, catalog) } /// Retrieves the list of available catalog names. pub fn catalog_names(&self) -> Vec { - self.state.read().catalog_list.catalog_names() + self.state.read().catalog_list().catalog_names() } /// Retrieves a [`CatalogProvider`] instance by name pub fn catalog(&self, name: &str) -> Option> { - self.state.read().catalog_list.catalog(name) + self.state.read().catalog_list().catalog(name) } /// Registers a [`TableProvider`] as a table that can be @@ -1280,7 +1253,7 @@ impl SessionContext { /// `query_execution_start_time` to the current time pub fn state(&self) -> SessionState { let mut state = self.state.read().clone(); - state.execution_props.start_execution(); + state.execution_props_mut().start_execution(); state } @@ -1291,7 +1264,7 @@ impl SessionContext { /// Register [`CatalogProviderList`] in [`SessionState`] pub fn register_catalog_list(&mut self, catalog_list: Arc) { - self.state.write().catalog_list = catalog_list; + self.state.write().register_catalog_list(catalog_list) } /// Registers a [`ConfigExtension`] as a table option extention that can be @@ -1299,7 +1272,7 @@ impl SessionContext { pub fn register_table_options_extension(&self, extension: T) { self.state .write() - .table_option_namespace + .default_table_options() .extensions .insert(extension) } @@ -1342,6 +1315,13 @@ impl FunctionRegistry for SessionContext { } } +/// Create a new task context instance from SessionContext +impl From<&SessionContext> for TaskContext { + fn from(session: &SessionContext) -> Self { + TaskContext::from(&*session.state.read()) + } +} + /// A planner used to add extensions to DataFusion logical and physical plans. #[async_trait] pub trait QueryPlanner { @@ -1353,23 +1333,6 @@ pub trait QueryPlanner { ) -> Result>; } -/// The query planner used if no user defined planner is provided -struct DefaultQueryPlanner {} - -#[async_trait] -impl QueryPlanner for DefaultQueryPlanner { - /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> Result> { - let planner = DefaultPhysicalPlanner::default(); - planner - .create_physical_plan(logical_plan, session_state) - .await - } -} /// A pluggable interface to handle `CREATE FUNCTION` statements /// and interact with [SessionState] to registers new udf, udaf or udwf. @@ -1395,990 +1358,6 @@ pub enum RegisterFunction { Table(String, Arc), } -/// Execution context for registering data sources and executing queries. -/// See [`SessionContext`] for a higher level API. -/// -/// Note that there is no `Default` or `new()` for SessionState, -/// to avoid accidentally running queries or other operations without passing through -/// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionContext`]. -#[derive(Clone)] -pub struct SessionState { - /// A unique UUID that identifies the session - session_id: String, - /// Responsible for analyzing and rewrite a logical plan before optimization - analyzer: Analyzer, - /// Responsible for optimizing a logical plan - optimizer: Optimizer, - /// Responsible for optimizing a physical execution plan - physical_optimizers: PhysicalOptimizer, - /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` - query_planner: Arc, - /// Collection of catalogs containing schemas and ultimately TableProviders - catalog_list: Arc, - /// Table Functions - table_functions: HashMap>, - /// Scalar functions that are registered with the context - scalar_functions: HashMap>, - /// Aggregate functions registered in the context - aggregate_functions: HashMap>, - /// Window functions registered in the context - window_functions: HashMap>, - /// Deserializer registry for extensions. - serializer_registry: Arc, - /// Session configuration - config: SessionConfig, - /// Table options - table_option_namespace: TableOptions, - /// Execution properties - execution_props: ExecutionProps, - /// TableProviderFactories for different file formats. - /// - /// Maps strings like "JSON" to an instance of [`TableProviderFactory`] - /// - /// This is used to create [`TableProvider`] instances for the - /// `CREATE EXTERNAL TABLE ... STORED AS ` for custom file - /// formats other than those built into DataFusion - table_factories: HashMap>, - /// Runtime environment - runtime_env: Arc, - - /// [FunctionFactory] to support pluggable user defined function handler. - /// - /// It will be invoked on `CREATE FUNCTION` statements. - /// thus, changing dialect o PostgreSql is required - function_factory: Option>, -} - -impl Debug for SessionState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SessionState") - .field("session_id", &self.session_id) - // TODO should we print out more? - .finish() - } -} - -impl SessionState { - /// Returns new [`SessionState`] using the provided - /// [`SessionConfig`] and [`RuntimeEnv`]. - pub fn new_with_config_rt(config: SessionConfig, runtime: Arc) -> Self { - let catalog_list = - Arc::new(MemoryCatalogProviderList::new()) as Arc; - Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list) - } - - /// Returns new [`SessionState`] using the provided - /// [`SessionConfig`] and [`RuntimeEnv`]. - #[deprecated(since = "32.0.0", note = "Use SessionState::new_with_config_rt")] - pub fn with_config_rt(config: SessionConfig, runtime: Arc) -> Self { - Self::new_with_config_rt(config, runtime) - } - - /// Returns new [`SessionState`] using the provided - /// [`SessionConfig`], [`RuntimeEnv`], and [`CatalogProviderList`] - pub fn new_with_config_rt_and_catalog_list( - config: SessionConfig, - runtime: Arc, - catalog_list: Arc, - ) -> Self { - let session_id = Uuid::new_v4().to_string(); - - // Create table_factories for all default formats - let mut table_factories: HashMap> = - HashMap::new(); - #[cfg(feature = "parquet")] - table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new())); - - if config.create_default_catalog_and_schema() { - let default_catalog = MemoryCatalogProvider::new(); - - default_catalog - .register_schema( - &config.options().catalog.default_schema, - Arc::new(MemorySchemaProvider::new()), - ) - .expect("memory catalog provider can register schema"); - - Self::register_default_schema( - &config, - &table_factories, - &runtime, - &default_catalog, - ); - - catalog_list.register_catalog( - config.options().catalog.default_catalog.clone(), - Arc::new(default_catalog), - ); - } - - let mut new_self = SessionState { - session_id, - analyzer: Analyzer::new(), - optimizer: Optimizer::new(), - physical_optimizers: PhysicalOptimizer::new(), - query_planner: Arc::new(DefaultQueryPlanner {}), - catalog_list, - table_functions: HashMap::new(), - scalar_functions: HashMap::new(), - aggregate_functions: HashMap::new(), - window_functions: HashMap::new(), - serializer_registry: Arc::new(EmptySerializerRegistry), - table_option_namespace: TableOptions::default_from_session_config( - config.options(), - ), - config, - execution_props: ExecutionProps::new(), - runtime_env: runtime, - table_factories, - function_factory: None, - }; - - // register built in functions - functions::register_all(&mut new_self) - .expect("can not register built in functions"); - - // register crate of array expressions (if enabled) - #[cfg(feature = "array_expressions")] - functions_array::register_all(&mut new_self) - .expect("can not register array expressions"); - - functions_aggregate::register_all(&mut new_self) - .expect("can not register aggregate functions"); - - new_self - } - /// Returns new [`SessionState`] using the provided - /// [`SessionConfig`] and [`RuntimeEnv`]. - #[deprecated( - since = "32.0.0", - note = "Use SessionState::new_with_config_rt_and_catalog_list" - )] - pub fn with_config_rt_and_catalog_list( - config: SessionConfig, - runtime: Arc, - catalog_list: Arc, - ) -> Self { - Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list) - } - fn register_default_schema( - config: &SessionConfig, - table_factories: &HashMap>, - runtime: &Arc, - default_catalog: &MemoryCatalogProvider, - ) { - let url = config.options().catalog.location.as_ref(); - let format = config.options().catalog.format.as_ref(); - let (url, format) = match (url, format) { - (Some(url), Some(format)) => (url, format), - _ => return, - }; - let url = url.to_string(); - let format = format.to_string(); - - let url = Url::parse(url.as_str()).expect("Invalid default catalog location!"); - let authority = match url.host_str() { - Some(host) => format!("{}://{}", url.scheme(), host), - None => format!("{}://", url.scheme()), - }; - let path = &url.as_str()[authority.len()..]; - let path = object_store::path::Path::parse(path).expect("Can't parse path"); - let store = ObjectStoreUrl::parse(authority.as_str()) - .expect("Invalid default catalog url"); - let store = match runtime.object_store(store) { - Ok(store) => store, - _ => return, - }; - let factory = match table_factories.get(format.as_str()) { - Some(factory) => factory, - _ => return, - }; - let schema = - ListingSchemaProvider::new(authority, path, factory.clone(), store, format); - let _ = default_catalog - .register_schema("default", Arc::new(schema)) - .expect("Failed to register default schema"); - } - - fn resolve_table_ref( - &self, - table_ref: impl Into, - ) -> ResolvedTableReference { - let catalog = &self.config_options().catalog; - table_ref - .into() - .resolve(&catalog.default_catalog, &catalog.default_schema) - } - - pub(crate) fn schema_for_ref( - &self, - table_ref: impl Into, - ) -> Result> { - let resolved_ref = self.resolve_table_ref(table_ref); - if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA - { - return Ok(Arc::new(InformationSchemaProvider::new( - self.catalog_list.clone(), - ))); - } - - self.catalog_list - .catalog(&resolved_ref.catalog) - .ok_or_else(|| { - plan_datafusion_err!( - "failed to resolve catalog: {}", - resolved_ref.catalog - ) - })? - .schema(&resolved_ref.schema) - .ok_or_else(|| { - plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema) - }) - } - - /// Replace the random session id. - pub fn with_session_id(mut self, session_id: String) -> Self { - self.session_id = session_id; - self - } - - /// override default query planner with `query_planner` - pub fn with_query_planner( - mut self, - query_planner: Arc, - ) -> Self { - self.query_planner = query_planner; - self - } - - /// Override the [`AnalyzerRule`]s optimizer plan rules. - pub fn with_analyzer_rules( - mut self, - rules: Vec>, - ) -> Self { - self.analyzer = Analyzer::with_rules(rules); - self - } - - /// Replace the entire list of [`OptimizerRule`]s used to optimize plans - pub fn with_optimizer_rules( - mut self, - rules: Vec>, - ) -> Self { - self.optimizer = Optimizer::with_rules(rules); - self - } - - /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize plans - pub fn with_physical_optimizer_rules( - mut self, - physical_optimizers: Vec>, - ) -> Self { - self.physical_optimizers = PhysicalOptimizer::with_rules(physical_optimizers); - self - } - - /// Add `analyzer_rule` to the end of the list of - /// [`AnalyzerRule`]s used to rewrite queries. - pub fn add_analyzer_rule( - mut self, - analyzer_rule: Arc, - ) -> Self { - self.analyzer.rules.push(analyzer_rule); - self - } - - /// Add `optimizer_rule` to the end of the list of - /// [`OptimizerRule`]s used to rewrite queries. - pub fn add_optimizer_rule( - mut self, - optimizer_rule: Arc, - ) -> Self { - self.optimizer.rules.push(optimizer_rule); - self - } - - /// Add `physical_optimizer_rule` to the end of the list of - /// [`PhysicalOptimizerRule`]s used to rewrite queries. - pub fn add_physical_optimizer_rule( - mut self, - physical_optimizer_rule: Arc, - ) -> Self { - self.physical_optimizers.rules.push(physical_optimizer_rule); - self - } - - /// Adds a new [`ConfigExtension`] to TableOptions - pub fn add_table_options_extension( - mut self, - extension: T, - ) -> Self { - self.table_option_namespace.extensions.insert(extension); - self - } - - /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements - pub fn with_function_factory( - mut self, - function_factory: Arc, - ) -> Self { - self.function_factory = Some(function_factory); - self - } - - /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements - pub fn set_function_factory(&mut self, function_factory: Arc) { - self.function_factory = Some(function_factory); - } - - /// Replace the extension [`SerializerRegistry`] - pub fn with_serializer_registry( - mut self, - registry: Arc, - ) -> Self { - self.serializer_registry = registry; - self - } - - /// Get the table factories - pub fn table_factories(&self) -> &HashMap> { - &self.table_factories - } - - /// Get the table factories - pub fn table_factories_mut( - &mut self, - ) -> &mut HashMap> { - &mut self.table_factories - } - - /// Parse an SQL string into an DataFusion specific AST - /// [`Statement`]. See [`SessionContext::sql`] for running queries. - pub fn sql_to_statement( - &self, - sql: &str, - dialect: &str, - ) -> Result { - let dialect = dialect_from_str(dialect).ok_or_else(|| { - plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi." - ) - })?; - let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?; - if statements.len() > 1 { - return not_impl_err!( - "The context currently only supports a single SQL statement" - ); - } - let statement = statements.pop_front().ok_or_else(|| { - DataFusionError::NotImplemented( - "The context requires a statement!".to_string(), - ) - })?; - Ok(statement) - } - - /// Resolve all table references in the SQL statement. - pub fn resolve_table_references( - &self, - statement: &datafusion_sql::parser::Statement, - ) -> Result> { - use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES; - use datafusion_sql::parser::Statement as DFStatement; - use sqlparser::ast::*; - - // Getting `TableProviders` is async but planing is not -- thus pre-fetch - // table providers for all relations referenced in this query - let mut relations = hashbrown::HashSet::with_capacity(10); - - struct RelationVisitor<'a>(&'a mut hashbrown::HashSet); - - impl<'a> RelationVisitor<'a> { - /// Record that `relation` was used in this statement - fn insert(&mut self, relation: &ObjectName) { - self.0.get_or_insert_with(relation, |_| relation.clone()); - } - } - - impl<'a> Visitor for RelationVisitor<'a> { - type Break = (); - - fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> { - self.insert(relation); - ControlFlow::Continue(()) - } - - fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> { - if let Statement::ShowCreate { - obj_type: ShowCreateObject::Table | ShowCreateObject::View, - obj_name, - } = statement - { - self.insert(obj_name) - } - ControlFlow::Continue(()) - } - } - - let mut visitor = RelationVisitor(&mut relations); - fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) { - match statement { - DFStatement::Statement(s) => { - let _ = s.as_ref().visit(visitor); - } - DFStatement::CreateExternalTable(table) => { - visitor - .0 - .insert(ObjectName(vec![Ident::from(table.name.as_str())])); - } - DFStatement::CopyTo(CopyToStatement { source, .. }) => match source { - CopyToSource::Relation(table_name) => { - visitor.insert(table_name); - } - CopyToSource::Query(query) => { - query.visit(visitor); - } - }, - DFStatement::Explain(explain) => { - visit_statement(&explain.statement, visitor) - } - } - } - - visit_statement(statement, &mut visitor); - - // Always include information_schema if available - if self.config.information_schema() { - for s in INFORMATION_SCHEMA_TABLES { - relations.insert(ObjectName(vec![ - Ident::new(INFORMATION_SCHEMA), - Ident::new(*s), - ])); - } - } - - let enable_ident_normalization = - self.config.options().sql_parser.enable_ident_normalization; - relations - .into_iter() - .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) - .collect::>() - } - - /// Convert an AST Statement into a LogicalPlan - pub async fn statement_to_plan( - &self, - statement: datafusion_sql::parser::Statement, - ) -> Result { - let references = self.resolve_table_references(&statement)?; - - let mut provider = SessionContextProvider { - state: self, - tables: HashMap::with_capacity(references.len()), - }; - - let enable_ident_normalization = - self.config.options().sql_parser.enable_ident_normalization; - let parse_float_as_decimal = - self.config.options().sql_parser.parse_float_as_decimal; - for reference in references { - let resolved = &self.resolve_table_ref(reference); - if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) { - if let Ok(schema) = self.schema_for_ref(resolved.clone()) { - if let Some(table) = schema.table(&resolved.table).await? { - v.insert(provider_as_source(table)); - } - } - } - } - - let query = SqlToRel::new_with_options( - &provider, - ParserOptions { - parse_float_as_decimal, - enable_ident_normalization, - }, - ); - query.statement_to_plan(statement) - } - - /// Creates a [`LogicalPlan`] from the provided SQL string. This - /// interface will plan any SQL DataFusion supports, including DML - /// like `CREATE TABLE`, and `COPY` (which can write to local - /// files. - /// - /// See [`SessionContext::sql`] and - /// [`SessionContext::sql_with_options`] for a higher-level - /// interface that handles DDL and verification of allowed - /// statements. - pub async fn create_logical_plan(&self, sql: &str) -> Result { - let dialect = self.config.options().sql_parser.dialect.as_str(); - let statement = self.sql_to_statement(sql, dialect)?; - let plan = self.statement_to_plan(statement).await?; - Ok(plan) - } - - /// Optimizes the logical plan by applying optimizer rules. - pub fn optimize(&self, plan: &LogicalPlan) -> Result { - if let LogicalPlan::Explain(e) = plan { - let mut stringified_plans = e.stringified_plans.clone(); - - // analyze & capture output of each rule - let analyzer_result = self.analyzer.execute_and_check( - e.plan.as_ref().clone(), - self.options(), - |analyzed_plan, analyzer| { - let analyzer_name = analyzer.name().to_string(); - let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; - stringified_plans.push(analyzed_plan.to_stringified(plan_type)); - }, - ); - let analyzed_plan = match analyzer_result { - Ok(plan) => plan, - Err(DataFusionError::Context(analyzer_name, err)) => { - let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; - stringified_plans - .push(StringifiedPlan::new(plan_type, err.to_string())); - - return Ok(LogicalPlan::Explain(Explain { - verbose: e.verbose, - plan: e.plan.clone(), - stringified_plans, - schema: e.schema.clone(), - logical_optimization_succeeded: false, - })); - } - Err(e) => return Err(e), - }; - - // to delineate the analyzer & optimizer phases in explain output - stringified_plans - .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan)); - - // optimize the child plan, capturing the output of each optimizer - let optimized_plan = self.optimizer.optimize( - analyzed_plan, - self, - |optimized_plan, optimizer| { - let optimizer_name = optimizer.name().to_string(); - let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; - stringified_plans.push(optimized_plan.to_stringified(plan_type)); - }, - ); - let (plan, logical_optimization_succeeded) = match optimized_plan { - Ok(plan) => (Arc::new(plan), true), - Err(DataFusionError::Context(optimizer_name, err)) => { - let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; - stringified_plans - .push(StringifiedPlan::new(plan_type, err.to_string())); - (e.plan.clone(), false) - } - Err(e) => return Err(e), - }; - - Ok(LogicalPlan::Explain(Explain { - verbose: e.verbose, - plan, - stringified_plans, - schema: e.schema.clone(), - logical_optimization_succeeded, - })) - } else { - let analyzed_plan = self.analyzer.execute_and_check( - plan.clone(), - self.options(), - |_, _| {}, - )?; - self.optimizer.optimize(analyzed_plan, self, |_, _| {}) - } - } - - /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. - /// - /// Note: this first calls [`Self::optimize`] on the provided - /// plan. - /// - /// This function will error for [`LogicalPlan`]s such as catalog DDL like - /// `CREATE TABLE`, which do not have corresponding physical plans and must - /// be handled by another layer, typically [`SessionContext`]. - pub async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - ) -> Result> { - let logical_plan = self.optimize(logical_plan)?; - self.query_planner - .create_physical_plan(&logical_plan, self) - .await - } - - /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type - /// coercion, and function rewrites. - /// - /// Note: The expression is not [simplified] or otherwise optimized: `a = 1 - /// + 2` will not be simplified to `a = 3` as this is a more involved process. - /// See the [expr_api] example for how to simplify expressions. - /// - /// # See Also: - /// * [`SessionContext::create_physical_expr`] for a higher-level API - /// * [`create_physical_expr`] for a lower-level API - /// - /// [simplified]: datafusion_optimizer::simplify_expressions - /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs - pub fn create_physical_expr( - &self, - expr: Expr, - df_schema: &DFSchema, - ) -> Result> { - let simplifier = - ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema)); - // apply type coercion here to ensure types match - let mut expr = simplifier.coerce(expr, df_schema)?; - - // rewrite Exprs to functions if necessary - let config_options = self.config_options(); - for rewrite in self.analyzer.function_rewrites() { - expr = expr - .transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))? - .data; - } - create_physical_expr(&expr, df_schema, self.execution_props()) - } - - /// Return the session ID - pub fn session_id(&self) -> &str { - &self.session_id - } - - /// Return the runtime env - pub fn runtime_env(&self) -> &Arc { - &self.runtime_env - } - - /// Return the execution properties - pub fn execution_props(&self) -> &ExecutionProps { - &self.execution_props - } - - /// Return the [`SessionConfig`] - pub fn config(&self) -> &SessionConfig { - &self.config - } - - /// Return the mutable [`SessionConfig`]. - pub fn config_mut(&mut self) -> &mut SessionConfig { - &mut self.config - } - - /// Return the physical optimizers - pub fn physical_optimizers(&self) -> &[Arc] { - &self.physical_optimizers.rules - } - - /// return the configuration options - pub fn config_options(&self) -> &ConfigOptions { - self.config.options() - } - - /// return the TableOptions options with its extensions - pub fn default_table_options(&self) -> TableOptions { - self.table_option_namespace - .combine_with_session_config(self.config_options()) - } - - /// Get a new TaskContext to run in this session - pub fn task_ctx(&self) -> Arc { - Arc::new(TaskContext::from(self)) - } - - /// Return catalog list - pub fn catalog_list(&self) -> Arc { - self.catalog_list.clone() - } - - /// Return reference to scalar_functions - pub fn scalar_functions(&self) -> &HashMap> { - &self.scalar_functions - } - - /// Return reference to aggregate_functions - pub fn aggregate_functions(&self) -> &HashMap> { - &self.aggregate_functions - } - - /// Return reference to window functions - pub fn window_functions(&self) -> &HashMap> { - &self.window_functions - } - - /// Return [SerializerRegistry] for extensions - pub fn serializer_registry(&self) -> Arc { - self.serializer_registry.clone() - } - - /// Return version of the cargo package that produced this query - pub fn version(&self) -> &str { - env!("CARGO_PKG_VERSION") - } -} - -struct SessionSimplifyProvider<'a> { - state: &'a SessionState, - df_schema: &'a DFSchema, -} - -impl<'a> SessionSimplifyProvider<'a> { - fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { - Self { state, df_schema } - } -} - -impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> { - fn is_boolean_type(&self, expr: &Expr) -> Result { - Ok(expr.get_type(self.df_schema)? == DataType::Boolean) - } - - fn nullable(&self, expr: &Expr) -> Result { - expr.nullable(self.df_schema) - } - - fn execution_props(&self) -> &ExecutionProps { - self.state.execution_props() - } - - fn get_data_type(&self, expr: &Expr) -> Result { - expr.get_type(self.df_schema) - } -} - -struct SessionContextProvider<'a> { - state: &'a SessionState, - tables: HashMap>, -} - -impl<'a> ContextProvider for SessionContextProvider<'a> { - fn get_table_source(&self, name: TableReference) -> Result> { - let name = self.state.resolve_table_ref(name).to_string(); - self.tables - .get(&name) - .cloned() - .ok_or_else(|| plan_datafusion_err!("table '{name}' not found")) - } - - fn get_table_function_source( - &self, - name: &str, - args: Vec, - ) -> Result> { - let tbl_func = self - .state - .table_functions - .get(name) - .cloned() - .ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?; - let provider = tbl_func.create_table_provider(&args)?; - - Ok(provider_as_source(provider)) - } - - /// Create a new CTE work table for a recursive CTE logical plan - /// This table will be used in conjunction with a Worktable physical plan - /// to read and write each iteration of a recursive CTE - fn create_cte_work_table( - &self, - name: &str, - schema: SchemaRef, - ) -> Result> { - let table = Arc::new(CteWorkTable::new(name, schema)); - Ok(provider_as_source(table)) - } - - fn get_function_meta(&self, name: &str) -> Option> { - self.state.scalar_functions().get(name).cloned() - } - - fn get_aggregate_meta(&self, name: &str) -> Option> { - self.state.aggregate_functions().get(name).cloned() - } - - fn get_window_meta(&self, name: &str) -> Option> { - self.state.window_functions().get(name).cloned() - } - - fn get_variable_type(&self, variable_names: &[String]) -> Option { - if variable_names.is_empty() { - return None; - } - - let provider_type = if is_system_variables(variable_names) { - VarType::System - } else { - VarType::UserDefined - }; - - self.state - .execution_props - .var_providers - .as_ref() - .and_then(|provider| provider.get(&provider_type)?.get_type(variable_names)) - } - - fn options(&self) -> &ConfigOptions { - self.state.config_options() - } - - fn udf_names(&self) -> Vec { - self.state.scalar_functions().keys().cloned().collect() - } - - fn udaf_names(&self) -> Vec { - self.state.aggregate_functions().keys().cloned().collect() - } - - fn udwf_names(&self) -> Vec { - self.state.window_functions().keys().cloned().collect() - } -} - -impl FunctionRegistry for SessionState { - fn udfs(&self) -> HashSet { - self.scalar_functions.keys().cloned().collect() - } - - fn udf(&self, name: &str) -> Result> { - let result = self.scalar_functions.get(name); - - result.cloned().ok_or_else(|| { - plan_datafusion_err!("There is no UDF named \"{name}\" in the registry") - }) - } - - fn udaf(&self, name: &str) -> Result> { - let result = self.aggregate_functions.get(name); - - result.cloned().ok_or_else(|| { - plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry") - }) - } - - fn udwf(&self, name: &str) -> Result> { - let result = self.window_functions.get(name); - - result.cloned().ok_or_else(|| { - plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry") - }) - } - - fn register_udf(&mut self, udf: Arc) -> Result>> { - udf.aliases().iter().for_each(|alias| { - self.scalar_functions.insert(alias.clone(), udf.clone()); - }); - Ok(self.scalar_functions.insert(udf.name().into(), udf)) - } - - fn register_udaf( - &mut self, - udaf: Arc, - ) -> Result>> { - udaf.aliases().iter().for_each(|alias| { - self.aggregate_functions.insert(alias.clone(), udaf.clone()); - }); - Ok(self.aggregate_functions.insert(udaf.name().into(), udaf)) - } - - fn register_udwf(&mut self, udwf: Arc) -> Result>> { - udwf.aliases().iter().for_each(|alias| { - self.window_functions.insert(alias.clone(), udwf.clone()); - }); - Ok(self.window_functions.insert(udwf.name().into(), udwf)) - } - - fn deregister_udf(&mut self, name: &str) -> Result>> { - let udf = self.scalar_functions.remove(name); - if let Some(udf) = &udf { - for alias in udf.aliases() { - self.scalar_functions.remove(alias); - } - } - Ok(udf) - } - - fn deregister_udaf(&mut self, name: &str) -> Result>> { - let udaf = self.aggregate_functions.remove(name); - if let Some(udaf) = &udaf { - for alias in udaf.aliases() { - self.aggregate_functions.remove(alias); - } - } - Ok(udaf) - } - - fn deregister_udwf(&mut self, name: &str) -> Result>> { - let udwf = self.window_functions.remove(name); - if let Some(udwf) = &udwf { - for alias in udwf.aliases() { - self.window_functions.remove(alias); - } - } - Ok(udwf) - } - - fn register_function_rewrite( - &mut self, - rewrite: Arc, - ) -> Result<()> { - self.analyzer.add_function_rewrite(rewrite); - Ok(()) - } -} - -impl OptimizerConfig for SessionState { - fn query_execution_start_time(&self) -> DateTime { - self.execution_props.query_execution_start_time - } - - fn alias_generator(&self) -> Arc { - self.execution_props.alias_generator.clone() - } - - fn options(&self) -> &ConfigOptions { - self.config_options() - } - - fn function_registry(&self) -> Option<&dyn FunctionRegistry> { - Some(self) - } -} - -/// Create a new task context instance from SessionContext -impl From<&SessionContext> for TaskContext { - fn from(session: &SessionContext) -> Self { - TaskContext::from(&*session.state.read()) - } -} - -/// Create a new task context instance from SessionState -impl From<&SessionState> for TaskContext { - fn from(state: &SessionState) -> Self { - let task_id = None; - TaskContext::new( - task_id, - state.session_id.clone(), - state.config.clone(), - state.scalar_functions.clone(), - state.aggregate_functions.clone(), - state.window_functions.clone(), - state.runtime_env.clone(), - ) - } -} - /// Default implementation of [SerializerRegistry] that throws unimplemented error /// for all requests. pub struct EmptySerializerRegistry; @@ -2505,6 +1484,8 @@ mod tests { use datafusion_common_runtime::SpawnedTask; + use crate::catalog::schema::SchemaProvider; + use crate::physical_planner::PhysicalPlanner; use async_trait::async_trait; use tempfile::TempDir; @@ -2806,7 +1787,7 @@ mod tests { let catalog_list_weak = { let state = ctx.state.read(); - Arc::downgrade(&state.catalog_list) + Arc::downgrade(&state.catalog_list()) }; drop(ctx); diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs index 7e757fabac8e..4e13d78f0d08 100644 --- a/datafusion/core/src/execution/mod.rs +++ b/datafusion/core/src/execution/mod.rs @@ -18,6 +18,8 @@ //! Shared state for query planning and execution. pub mod context; +pub mod state; + // backwards compatibility pub use crate::datasource::file_format::options; diff --git a/datafusion/core/src/execution/state.rs b/datafusion/core/src/execution/state.rs new file mode 100644 index 000000000000..124e20fecb62 --- /dev/null +++ b/datafusion/core/src/execution/state.rs @@ -0,0 +1,1130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`SessionState`]: information required to run queries in a session + +use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; +use crate::catalog::listing_schema::ListingSchemaProvider; +use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider}; +use crate::catalog::{ + CatalogProvider, CatalogProviderList, MemoryCatalogProvider, + MemoryCatalogProviderList, +}; +use crate::datasource::cte_worktable::CteWorkTable; +use crate::datasource::function::{TableFunction, TableFunctionImpl}; +use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory}; +use crate::datasource::provider_as_source; +use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; +#[cfg(feature = "array_expressions")] +use crate::functions_array; +use crate::physical_optimizer::optimizer::PhysicalOptimizer; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use crate::{functions, functions_aggregate}; +use arrow_schema::{DataType, SchemaRef}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion_common::alias::AliasGenerator; +use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; +use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; +use datafusion_common::tree_node::TreeNode; +use datafusion_common::{ + not_impl_err, plan_datafusion_err, DFSchema, DataFusionError, ResolvedTableReference, + TableReference, +}; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::expr_rewriter::FunctionRewrite; +use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry}; +use datafusion_expr::simplify::SimplifyInfo; +use datafusion_expr::var_provider::{is_system_variables, VarType}; +use datafusion_expr::{ + AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, TableSource, + WindowUDF, +}; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; +use datafusion_optimizer::{ + Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, +}; +use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser}; +use datafusion_sql::planner::{ + object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel, +}; +use sqlparser::dialect::dialect_from_str; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::ops::ControlFlow; +use std::sync::Arc; +use url::Url; +use uuid::Uuid; + +/// Execution context for registering data sources and executing queries. +/// See [`SessionContext`] for a higher level API. +/// +/// Note that there is no `Default` or `new()` for SessionState, +/// to avoid accidentally running queries or other operations without passing through +/// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionContext`]. +#[derive(Clone)] +pub struct SessionState { + /// A unique UUID that identifies the session + session_id: String, + /// Responsible for analyzing and rewrite a logical plan before optimization + analyzer: Analyzer, + /// Responsible for optimizing a logical plan + optimizer: Optimizer, + /// Responsible for optimizing a physical execution plan + physical_optimizers: PhysicalOptimizer, + /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` + query_planner: Arc, + /// Collection of catalogs containing schemas and ultimately TableProviders + catalog_list: Arc, + /// Table Functions + table_functions: HashMap>, + /// Scalar functions that are registered with the context + scalar_functions: HashMap>, + /// Aggregate functions registered in the context + aggregate_functions: HashMap>, + /// Window functions registered in the context + window_functions: HashMap>, + /// Deserializer registry for extensions. + serializer_registry: Arc, + /// Session configuration + config: SessionConfig, + /// Table options + table_options: TableOptions, + /// Execution properties + execution_props: ExecutionProps, + /// TableProviderFactories for different file formats. + /// + /// Maps strings like "JSON" to an instance of [`TableProviderFactory`] + /// + /// This is used to create [`TableProvider`] instances for the + /// `CREATE EXTERNAL TABLE ... STORED AS ` for custom file + /// formats other than those built into DataFusion + table_factories: HashMap>, + /// Runtime environment + runtime_env: Arc, + + /// [FunctionFactory] to support pluggable user defined function handler. + /// + /// It will be invoked on `CREATE FUNCTION` statements. + /// thus, changing dialect o PostgreSql is required + function_factory: Option>, +} + +impl Debug for SessionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionState") + .field("session_id", &self.session_id) + // TODO should we print out more? + .finish() + } +} + +impl SessionState { + /// Returns new [`SessionState`] using the provided + /// [`SessionConfig`] and [`RuntimeEnv`]. + pub fn new_with_config_rt(config: SessionConfig, runtime: Arc) -> Self { + let catalog_list = + Arc::new(MemoryCatalogProviderList::new()) as Arc; + Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list) + } + + /// Returns new [`SessionState`] using the provided + /// [`SessionConfig`] and [`RuntimeEnv`]. + #[deprecated(since = "32.0.0", note = "Use SessionState::new_with_config_rt")] + pub fn with_config_rt(config: SessionConfig, runtime: Arc) -> Self { + Self::new_with_config_rt(config, runtime) + } + + /// Returns new [`SessionState`] using the provided + /// [`SessionConfig`], [`RuntimeEnv`], and [`CatalogProviderList`] + pub fn new_with_config_rt_and_catalog_list( + config: SessionConfig, + runtime: Arc, + catalog_list: Arc, + ) -> Self { + let session_id = Uuid::new_v4().to_string(); + + // Create table_factories for all default formats + let mut table_factories: HashMap> = + HashMap::new(); + #[cfg(feature = "parquet")] + table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new())); + + if config.create_default_catalog_and_schema() { + let default_catalog = MemoryCatalogProvider::new(); + + default_catalog + .register_schema( + &config.options().catalog.default_schema, + Arc::new(MemorySchemaProvider::new()), + ) + .expect("memory catalog provider can register schema"); + + Self::register_default_schema( + &config, + &table_factories, + &runtime, + &default_catalog, + ); + + catalog_list.register_catalog( + config.options().catalog.default_catalog.clone(), + Arc::new(default_catalog), + ); + } + + let mut new_self = SessionState { + session_id, + analyzer: Analyzer::new(), + optimizer: Optimizer::new(), + physical_optimizers: PhysicalOptimizer::new(), + query_planner: Arc::new(DefaultQueryPlanner {}), + catalog_list, + table_functions: HashMap::new(), + scalar_functions: HashMap::new(), + aggregate_functions: HashMap::new(), + window_functions: HashMap::new(), + serializer_registry: Arc::new(EmptySerializerRegistry), + table_options: TableOptions::default_from_session_config(config.options()), + config, + execution_props: ExecutionProps::new(), + runtime_env: runtime, + table_factories, + function_factory: None, + }; + + // register built in functions + functions::register_all(&mut new_self) + .expect("can not register built in functions"); + + // register crate of array expressions (if enabled) + #[cfg(feature = "array_expressions")] + functions_array::register_all(&mut new_self) + .expect("can not register array expressions"); + + functions_aggregate::register_all(&mut new_self) + .expect("can not register aggregate functions"); + + new_self + } + /// Returns new [`SessionState`] using the provided + /// [`SessionConfig`] and [`RuntimeEnv`]. + #[deprecated( + since = "32.0.0", + note = "Use SessionState::new_with_config_rt_and_catalog_list" + )] + pub fn with_config_rt_and_catalog_list( + config: SessionConfig, + runtime: Arc, + catalog_list: Arc, + ) -> Self { + Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list) + } + fn register_default_schema( + config: &SessionConfig, + table_factories: &HashMap>, + runtime: &Arc, + default_catalog: &MemoryCatalogProvider, + ) { + let url = config.options().catalog.location.as_ref(); + let format = config.options().catalog.format.as_ref(); + let (url, format) = match (url, format) { + (Some(url), Some(format)) => (url, format), + _ => return, + }; + let url = url.to_string(); + let format = format.to_string(); + + let url = Url::parse(url.as_str()).expect("Invalid default catalog location!"); + let authority = match url.host_str() { + Some(host) => format!("{}://{}", url.scheme(), host), + None => format!("{}://", url.scheme()), + }; + let path = &url.as_str()[authority.len()..]; + let path = object_store::path::Path::parse(path).expect("Can't parse path"); + let store = ObjectStoreUrl::parse(authority.as_str()) + .expect("Invalid default catalog url"); + let store = match runtime.object_store(store) { + Ok(store) => store, + _ => return, + }; + let factory = match table_factories.get(format.as_str()) { + Some(factory) => factory, + _ => return, + }; + let schema = + ListingSchemaProvider::new(authority, path, factory.clone(), store, format); + let _ = default_catalog + .register_schema("default", Arc::new(schema)) + .expect("Failed to register default schema"); + } + + pub(crate) fn resolve_table_ref( + &self, + table_ref: impl Into, + ) -> ResolvedTableReference { + let catalog = &self.config_options().catalog; + table_ref + .into() + .resolve(&catalog.default_catalog, &catalog.default_schema) + } + + pub(crate) fn schema_for_ref( + &self, + table_ref: impl Into, + ) -> datafusion_common::Result> { + let resolved_ref = self.resolve_table_ref(table_ref); + if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA + { + return Ok(Arc::new(InformationSchemaProvider::new( + self.catalog_list.clone(), + ))); + } + + self.catalog_list + .catalog(&resolved_ref.catalog) + .ok_or_else(|| { + plan_datafusion_err!( + "failed to resolve catalog: {}", + resolved_ref.catalog + ) + })? + .schema(&resolved_ref.schema) + .ok_or_else(|| { + plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema) + }) + } + + /// Replace the random session id. + pub fn with_session_id(mut self, session_id: String) -> Self { + self.session_id = session_id; + self + } + + /// override default query planner with `query_planner` + pub fn with_query_planner( + mut self, + query_planner: Arc, + ) -> Self { + self.query_planner = query_planner; + self + } + + /// Override the [`AnalyzerRule`]s optimizer plan rules. + pub fn with_analyzer_rules( + mut self, + rules: Vec>, + ) -> Self { + self.analyzer = Analyzer::with_rules(rules); + self + } + + /// Replace the entire list of [`OptimizerRule`]s used to optimize plans + pub fn with_optimizer_rules( + mut self, + rules: Vec>, + ) -> Self { + self.optimizer = Optimizer::with_rules(rules); + self + } + + /// Replace the entire list of [`PhysicalOptimizerRule`]s used to optimize plans + pub fn with_physical_optimizer_rules( + mut self, + physical_optimizers: Vec>, + ) -> Self { + self.physical_optimizers = PhysicalOptimizer::with_rules(physical_optimizers); + self + } + + /// Add `analyzer_rule` to the end of the list of + /// [`AnalyzerRule`]s used to rewrite queries. + pub fn add_analyzer_rule( + mut self, + analyzer_rule: Arc, + ) -> Self { + self.analyzer.rules.push(analyzer_rule); + self + } + + /// Add `optimizer_rule` to the end of the list of + /// [`OptimizerRule`]s used to rewrite queries. + pub fn add_optimizer_rule( + mut self, + optimizer_rule: Arc, + ) -> Self { + self.optimizer.rules.push(optimizer_rule); + self + } + + /// Add `physical_optimizer_rule` to the end of the list of + /// [`PhysicalOptimizerRule`]s used to rewrite queries. + pub fn add_physical_optimizer_rule( + mut self, + physical_optimizer_rule: Arc, + ) -> Self { + self.physical_optimizers.rules.push(physical_optimizer_rule); + self + } + + /// Adds a new [`ConfigExtension`] to TableOptions + pub fn add_table_options_extension( + mut self, + extension: T, + ) -> Self { + self.table_options.extensions.insert(extension); + self + } + + /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements + pub fn with_function_factory( + mut self, + function_factory: Arc, + ) -> Self { + self.function_factory = Some(function_factory); + self + } + + /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements + pub fn set_function_factory(&mut self, function_factory: Arc) { + self.function_factory = Some(function_factory); + } + + /// Replace the extension [`SerializerRegistry`] + pub fn with_serializer_registry( + mut self, + registry: Arc, + ) -> Self { + self.serializer_registry = registry; + self + } + + /// Get the function factory + pub fn function_factory(&self) -> Option<&Arc> { + self.function_factory.as_ref() + } + + /// Get the table factories + pub fn table_factories(&self) -> &HashMap> { + &self.table_factories + } + + /// Get the table factories + pub fn table_factories_mut( + &mut self, + ) -> &mut HashMap> { + &mut self.table_factories + } + + /// Parse an SQL string into an DataFusion specific AST + /// [`Statement`]. See [`SessionContext::sql`] for running queries. + pub fn sql_to_statement( + &self, + sql: &str, + dialect: &str, + ) -> datafusion_common::Result { + let dialect = dialect_from_str(dialect).ok_or_else(|| { + plan_datafusion_err!( + "Unsupported SQL dialect: {dialect}. Available dialects: \ + Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ + MsSQL, ClickHouse, BigQuery, Ansi." + ) + })?; + let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?; + if statements.len() > 1 { + return not_impl_err!( + "The context currently only supports a single SQL statement" + ); + } + let statement = statements.pop_front().ok_or_else(|| { + DataFusionError::NotImplemented( + "The context requires a statement!".to_string(), + ) + })?; + Ok(statement) + } + + /// Resolve all table references in the SQL statement. + pub fn resolve_table_references( + &self, + statement: &datafusion_sql::parser::Statement, + ) -> datafusion_common::Result> { + use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES; + use datafusion_sql::parser::Statement as DFStatement; + use sqlparser::ast::*; + + // Getting `TableProviders` is async but planing is not -- thus pre-fetch + // table providers for all relations referenced in this query + let mut relations = hashbrown::HashSet::with_capacity(10); + + struct RelationVisitor<'a>(&'a mut hashbrown::HashSet); + + impl<'a> RelationVisitor<'a> { + /// Record that `relation` was used in this statement + fn insert(&mut self, relation: &ObjectName) { + self.0.get_or_insert_with(relation, |_| relation.clone()); + } + } + + impl<'a> Visitor for RelationVisitor<'a> { + type Break = (); + + fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> { + self.insert(relation); + ControlFlow::Continue(()) + } + + fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> { + if let Statement::ShowCreate { + obj_type: ShowCreateObject::Table | ShowCreateObject::View, + obj_name, + } = statement + { + self.insert(obj_name) + } + ControlFlow::Continue(()) + } + } + + let mut visitor = RelationVisitor(&mut relations); + fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) { + match statement { + DFStatement::Statement(s) => { + let _ = s.as_ref().visit(visitor); + } + DFStatement::CreateExternalTable(table) => { + visitor + .0 + .insert(ObjectName(vec![Ident::from(table.name.as_str())])); + } + DFStatement::CopyTo(CopyToStatement { source, .. }) => match source { + CopyToSource::Relation(table_name) => { + visitor.insert(table_name); + } + CopyToSource::Query(query) => { + query.visit(visitor); + } + }, + DFStatement::Explain(explain) => { + visit_statement(&explain.statement, visitor) + } + } + } + + visit_statement(statement, &mut visitor); + + // Always include information_schema if available + if self.config.information_schema() { + for s in INFORMATION_SCHEMA_TABLES { + relations.insert(ObjectName(vec![ + Ident::new(INFORMATION_SCHEMA), + Ident::new(*s), + ])); + } + } + + let enable_ident_normalization = + self.config.options().sql_parser.enable_ident_normalization; + relations + .into_iter() + .map(|x| object_name_to_table_reference(x, enable_ident_normalization)) + .collect::>() + } + + /// Convert an AST Statement into a LogicalPlan + pub async fn statement_to_plan( + &self, + statement: datafusion_sql::parser::Statement, + ) -> datafusion_common::Result { + let references = self.resolve_table_references(&statement)?; + + let mut provider = SessionContextProvider { + state: self, + tables: HashMap::with_capacity(references.len()), + }; + + let enable_ident_normalization = + self.config.options().sql_parser.enable_ident_normalization; + let parse_float_as_decimal = + self.config.options().sql_parser.parse_float_as_decimal; + for reference in references { + let resolved = &self.resolve_table_ref(reference); + if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) { + if let Ok(schema) = self.schema_for_ref(resolved.clone()) { + if let Some(table) = schema.table(&resolved.table).await? { + v.insert(provider_as_source(table)); + } + } + } + } + + let query = SqlToRel::new_with_options( + &provider, + ParserOptions { + parse_float_as_decimal, + enable_ident_normalization, + }, + ); + query.statement_to_plan(statement) + } + + /// Creates a [`LogicalPlan`] from the provided SQL string. This + /// interface will plan any SQL DataFusion supports, including DML + /// like `CREATE TABLE`, and `COPY` (which can write to local + /// files. + /// + /// See [`SessionContext::sql`] and + /// [`SessionContext::sql_with_options`] for a higher-level + /// interface that handles DDL and verification of allowed + /// statements. + pub async fn create_logical_plan( + &self, + sql: &str, + ) -> datafusion_common::Result { + let dialect = self.config.options().sql_parser.dialect.as_str(); + let statement = self.sql_to_statement(sql, dialect)?; + let plan = self.statement_to_plan(statement).await?; + Ok(plan) + } + + /// Optimizes the logical plan by applying optimizer rules. + pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result { + if let LogicalPlan::Explain(e) = plan { + let mut stringified_plans = e.stringified_plans.clone(); + + // analyze & capture output of each rule + let analyzer_result = self.analyzer.execute_and_check( + e.plan.as_ref().clone(), + self.options(), + |analyzed_plan, analyzer| { + let analyzer_name = analyzer.name().to_string(); + let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; + stringified_plans.push(analyzed_plan.to_stringified(plan_type)); + }, + ); + let analyzed_plan = match analyzer_result { + Ok(plan) => plan, + Err(DataFusionError::Context(analyzer_name, err)) => { + let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; + stringified_plans + .push(StringifiedPlan::new(plan_type, err.to_string())); + + return Ok(LogicalPlan::Explain(Explain { + verbose: e.verbose, + plan: e.plan.clone(), + stringified_plans, + schema: e.schema.clone(), + logical_optimization_succeeded: false, + })); + } + Err(e) => return Err(e), + }; + + // to delineate the analyzer & optimizer phases in explain output + stringified_plans + .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan)); + + // optimize the child plan, capturing the output of each optimizer + let optimized_plan = self.optimizer.optimize( + analyzed_plan, + self, + |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + }, + ); + let (plan, logical_optimization_succeeded) = match optimized_plan { + Ok(plan) => (Arc::new(plan), true), + Err(DataFusionError::Context(optimizer_name, err)) => { + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans + .push(StringifiedPlan::new(plan_type, err.to_string())); + (e.plan.clone(), false) + } + Err(e) => return Err(e), + }; + + Ok(LogicalPlan::Explain(Explain { + verbose: e.verbose, + plan, + stringified_plans, + schema: e.schema.clone(), + logical_optimization_succeeded, + })) + } else { + let analyzed_plan = self.analyzer.execute_and_check( + plan.clone(), + self.options(), + |_, _| {}, + )?; + self.optimizer.optimize(analyzed_plan, self, |_, _| {}) + } + } + + /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. + /// + /// Note: this first calls [`Self::optimize`] on the provided + /// plan. + /// + /// This function will error for [`LogicalPlan`]s such as catalog DDL like + /// `CREATE TABLE`, which do not have corresponding physical plans and must + /// be handled by another layer, typically [`SessionContext`]. + pub async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> datafusion_common::Result> { + let logical_plan = self.optimize(logical_plan)?; + self.query_planner + .create_physical_plan(&logical_plan, self) + .await + } + + /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion, and function rewrites. + /// + /// Note: The expression is not [simplified] or otherwise optimized: `a = 1 + /// + 2` will not be simplified to `a = 3` as this is a more involved process. + /// See the [expr_api] example for how to simplify expressions. + /// + /// # See Also: + /// * [`SessionContext::create_physical_expr`] for a higher-level API + /// * [`create_physical_expr`] for a lower-level API + /// + /// [simplified]: datafusion_optimizer::simplify_expressions + /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + pub fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> datafusion_common::Result> { + let simplifier = + ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema)); + // apply type coercion here to ensure types match + let mut expr = simplifier.coerce(expr, df_schema)?; + + // rewrite Exprs to functions if necessary + let config_options = self.config_options(); + for rewrite in self.analyzer.function_rewrites() { + expr = expr + .transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))? + .data; + } + create_physical_expr(&expr, df_schema, self.execution_props()) + } + + /// Return the session ID + pub fn session_id(&self) -> &str { + &self.session_id + } + + /// Return the runtime env + pub fn runtime_env(&self) -> &Arc { + &self.runtime_env + } + + /// Return the execution properties + pub fn execution_props(&self) -> &ExecutionProps { + &self.execution_props + } + + /// Return mutable execution properties + pub fn execution_props_mut(&mut self) -> &mut ExecutionProps { + &mut self.execution_props + } + + /// Return the [`SessionConfig`] + pub fn config(&self) -> &SessionConfig { + &self.config + } + + /// Return the mutable [`SessionConfig`]. + pub fn config_mut(&mut self) -> &mut SessionConfig { + &mut self.config + } + + /// Return the physical optimizers + pub fn physical_optimizers(&self) -> &[Arc] { + &self.physical_optimizers.rules + } + + /// return the configuration options + pub fn config_options(&self) -> &ConfigOptions { + self.config.options() + } + + /// return the TableOptions options with its extensions + pub fn default_table_options(&self) -> TableOptions { + self.table_options + .combine_with_session_config(self.config_options()) + } + + /// Return the table options + pub fn table_options(&self) -> &TableOptions { + &self.table_options + } + + /// Return mutable table opptions + pub fn table_options_mut(&mut self) -> &mut TableOptions { + &mut self.table_options + } + + /// Get a new TaskContext to run in this session + pub fn task_ctx(&self) -> Arc { + Arc::new(TaskContext::from(self)) + } + + /// Return catalog list + pub fn catalog_list(&self) -> Arc { + self.catalog_list.clone() + } + + /// set the catalog list + pub(crate) fn register_catalog_list( + &mut self, + catalog_list: Arc, + ) { + self.catalog_list = catalog_list; + } + + /// Return reference to scalar_functions + pub fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + /// Return reference to aggregate_functions + pub fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + /// Return reference to window functions + pub fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + + /// Return [SerializerRegistry] for extensions + pub fn serializer_registry(&self) -> Arc { + self.serializer_registry.clone() + } + + /// Return version of the cargo package that produced this query + pub fn version(&self) -> &str { + env!("CARGO_PKG_VERSION") + } + + /// Register a user defined table function + pub fn register_udtf(&mut self, name: &str, fun: Arc) { + self.table_functions.insert( + name.to_owned(), + Arc::new(TableFunction::new(name.to_owned(), fun)), + ); + } +} + +struct SessionContextProvider<'a> { + state: &'a SessionState, + tables: HashMap>, +} + +impl<'a> ContextProvider for SessionContextProvider<'a> { + fn get_table_source( + &self, + name: TableReference, + ) -> datafusion_common::Result> { + let name = self.state.resolve_table_ref(name).to_string(); + self.tables + .get(&name) + .cloned() + .ok_or_else(|| plan_datafusion_err!("table '{name}' not found")) + } + + fn get_table_function_source( + &self, + name: &str, + args: Vec, + ) -> datafusion_common::Result> { + let tbl_func = self + .state + .table_functions + .get(name) + .cloned() + .ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?; + let provider = tbl_func.create_table_provider(&args)?; + + Ok(provider_as_source(provider)) + } + + /// Create a new CTE work table for a recursive CTE logical plan + /// This table will be used in conjunction with a Worktable physical plan + /// to read and write each iteration of a recursive CTE + fn create_cte_work_table( + &self, + name: &str, + schema: SchemaRef, + ) -> datafusion_common::Result> { + let table = Arc::new(CteWorkTable::new(name, schema)); + Ok(provider_as_source(table)) + } + + fn get_function_meta(&self, name: &str) -> Option> { + self.state.scalar_functions().get(name).cloned() + } + + fn get_aggregate_meta(&self, name: &str) -> Option> { + self.state.aggregate_functions().get(name).cloned() + } + + fn get_window_meta(&self, name: &str) -> Option> { + self.state.window_functions().get(name).cloned() + } + + fn get_variable_type(&self, variable_names: &[String]) -> Option { + if variable_names.is_empty() { + return None; + } + + let provider_type = if is_system_variables(variable_names) { + VarType::System + } else { + VarType::UserDefined + }; + + self.state + .execution_props + .var_providers + .as_ref() + .and_then(|provider| provider.get(&provider_type)?.get_type(variable_names)) + } + + fn options(&self) -> &ConfigOptions { + self.state.config_options() + } + + fn udf_names(&self) -> Vec { + self.state.scalar_functions().keys().cloned().collect() + } + + fn udaf_names(&self) -> Vec { + self.state.aggregate_functions().keys().cloned().collect() + } + + fn udwf_names(&self) -> Vec { + self.state.window_functions().keys().cloned().collect() + } +} + +impl FunctionRegistry for SessionState { + fn udfs(&self) -> HashSet { + self.scalar_functions.keys().cloned().collect() + } + + fn udf(&self, name: &str) -> datafusion_common::Result> { + let result = self.scalar_functions.get(name); + + result.cloned().ok_or_else(|| { + plan_datafusion_err!("There is no UDF named \"{name}\" in the registry") + }) + } + + fn udaf(&self, name: &str) -> datafusion_common::Result> { + let result = self.aggregate_functions.get(name); + + result.cloned().ok_or_else(|| { + plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry") + }) + } + + fn udwf(&self, name: &str) -> datafusion_common::Result> { + let result = self.window_functions.get(name); + + result.cloned().ok_or_else(|| { + plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry") + }) + } + + fn register_udf( + &mut self, + udf: Arc, + ) -> datafusion_common::Result>> { + udf.aliases().iter().for_each(|alias| { + self.scalar_functions.insert(alias.clone(), udf.clone()); + }); + Ok(self.scalar_functions.insert(udf.name().into(), udf)) + } + + fn register_udaf( + &mut self, + udaf: Arc, + ) -> datafusion_common::Result>> { + udaf.aliases().iter().for_each(|alias| { + self.aggregate_functions.insert(alias.clone(), udaf.clone()); + }); + Ok(self.aggregate_functions.insert(udaf.name().into(), udaf)) + } + + fn register_udwf( + &mut self, + udwf: Arc, + ) -> datafusion_common::Result>> { + udwf.aliases().iter().for_each(|alias| { + self.window_functions.insert(alias.clone(), udwf.clone()); + }); + Ok(self.window_functions.insert(udwf.name().into(), udwf)) + } + + fn deregister_udf( + &mut self, + name: &str, + ) -> datafusion_common::Result>> { + let udf = self.scalar_functions.remove(name); + if let Some(udf) = &udf { + for alias in udf.aliases() { + self.scalar_functions.remove(alias); + } + } + Ok(udf) + } + + fn deregister_udaf( + &mut self, + name: &str, + ) -> datafusion_common::Result>> { + let udaf = self.aggregate_functions.remove(name); + if let Some(udaf) = &udaf { + for alias in udaf.aliases() { + self.aggregate_functions.remove(alias); + } + } + Ok(udaf) + } + + fn deregister_udwf( + &mut self, + name: &str, + ) -> datafusion_common::Result>> { + let udwf = self.window_functions.remove(name); + if let Some(udwf) = &udwf { + for alias in udwf.aliases() { + self.window_functions.remove(alias); + } + } + Ok(udwf) + } + + fn register_function_rewrite( + &mut self, + rewrite: Arc, + ) -> datafusion_common::Result<()> { + self.analyzer.add_function_rewrite(rewrite); + Ok(()) + } +} + +impl OptimizerConfig for SessionState { + fn query_execution_start_time(&self) -> DateTime { + self.execution_props.query_execution_start_time + } + + fn alias_generator(&self) -> Arc { + self.execution_props.alias_generator.clone() + } + + fn options(&self) -> &ConfigOptions { + self.config_options() + } + + fn function_registry(&self) -> Option<&dyn FunctionRegistry> { + Some(self) + } +} + +/// Create a new task context instance from SessionState +impl From<&SessionState> for TaskContext { + fn from(state: &SessionState) -> Self { + let task_id = None; + TaskContext::new( + task_id, + state.session_id.clone(), + state.config.clone(), + state.scalar_functions.clone(), + state.aggregate_functions.clone(), + state.window_functions.clone(), + state.runtime_env.clone(), + ) + } +} + +/// The query planner used if no user defined planner is provided +struct DefaultQueryPlanner {} + +#[async_trait] +impl QueryPlanner for DefaultQueryPlanner { + /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> datafusion_common::Result> { + let planner = DefaultPhysicalPlanner::default(); + planner + .create_physical_plan(logical_plan, session_state) + .await + } +} + +struct SessionSimplifyProvider<'a> { + state: &'a SessionState, + df_schema: &'a DFSchema, +} + +impl<'a> SessionSimplifyProvider<'a> { + fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { + Self { state, df_schema } + } +} + +impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> { + fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result { + Ok(expr.get_type(self.df_schema)? == DataType::Boolean) + } + + fn nullable(&self, expr: &Expr) -> datafusion_common::Result { + expr.nullable(self.df_schema) + } + + fn execution_props(&self) -> &ExecutionProps { + self.state.execution_props() + } + + fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result { + expr.get_type(self.df_schema) + } +} From fa12ece15a349a41050a9c482dccd9e2cc7c0b52 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 Jun 2024 16:29:32 -0400 Subject: [PATCH 2/4] fix docs --- datafusion/core/src/execution/state.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/state.rs b/datafusion/core/src/execution/state.rs index 124e20fecb62..4d0b87bc336b 100644 --- a/datafusion/core/src/execution/state.rs +++ b/datafusion/core/src/execution/state.rs @@ -66,7 +66,7 @@ use datafusion_optimizer::{ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; -use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser}; +use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser, Statement}; use datafusion_sql::planner::{ object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel, }; @@ -85,6 +85,8 @@ use uuid::Uuid; /// Note that there is no `Default` or `new()` for SessionState, /// to avoid accidentally running queries or other operations without passing through /// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionContext`]. +/// +/// [`SessionContext`]: crate::execution::context::SessionContext #[derive(Clone)] pub struct SessionState { /// A unique UUID that identifies the session @@ -122,6 +124,8 @@ pub struct SessionState { /// This is used to create [`TableProvider`] instances for the /// `CREATE EXTERNAL TABLE ... STORED AS ` for custom file /// formats other than those built into DataFusion + /// + /// [`TableProvider`]: crate::datasource::provider::TableProvider table_factories: HashMap>, /// Runtime environment runtime_env: Arc, @@ -446,11 +450,13 @@ impl SessionState { /// Parse an SQL string into an DataFusion specific AST /// [`Statement`]. See [`SessionContext::sql`] for running queries. + /// + /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql pub fn sql_to_statement( &self, sql: &str, dialect: &str, - ) -> datafusion_common::Result { + ) -> datafusion_common::Result { let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( "Unsupported SQL dialect: {dialect}. Available dialects: \ @@ -605,6 +611,9 @@ impl SessionState { /// [`SessionContext::sql_with_options`] for a higher-level /// interface that handles DDL and verification of allowed /// statements. + /// + /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql + /// [`SessionContext::sql_with_options`]: crate::execution::context::SessionContext::sql_with_options pub async fn create_logical_plan( &self, sql: &str, @@ -698,6 +707,8 @@ impl SessionState { /// This function will error for [`LogicalPlan`]s such as catalog DDL like /// `CREATE TABLE`, which do not have corresponding physical plans and must /// be handled by another layer, typically [`SessionContext`]. + /// + /// [`SessionContext`]: crate::execution::context::SessionContext pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -721,6 +732,7 @@ impl SessionState { /// /// [simplified]: datafusion_optimizer::simplify_expressions /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + /// [`SessionContext::create_physical_expr`]: crate::execution::context::SessionContext::create_physical_expr pub fn create_physical_expr( &self, expr: Expr, From 1b0b39fe0354c469734e50df848ee3a7d99a0b3e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 Jun 2024 16:33:31 -0400 Subject: [PATCH 3/4] fix test --- datafusion/core/src/execution/context/mod.rs | 4 +--- datafusion/core/src/execution/state.rs | 6 ++++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index acf65d4c5332..785ec4b6c777 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1272,9 +1272,7 @@ impl SessionContext { pub fn register_table_options_extension(&self, extension: T) { self.state .write() - .default_table_options() - .extensions - .insert(extension) + .register_table_options_extension(extension) } } diff --git a/datafusion/core/src/execution/state.rs b/datafusion/core/src/execution/state.rs index 4d0b87bc336b..3b072ebb7384 100644 --- a/datafusion/core/src/execution/state.rs +++ b/datafusion/core/src/execution/state.rs @@ -809,6 +809,12 @@ impl SessionState { &mut self.table_options } + /// Registers a [`ConfigExtension`] as a table option extention that can be + /// referenced from SQL statements executed against this context. + pub fn register_table_options_extension(&mut self, extension: T) { + self.table_options.extensions.insert(extension) + } + /// Get a new TaskContext to run in this session pub fn task_ctx(&self) -> Arc { Arc::new(TaskContext::from(self)) From 0c1dbbe4f14b8c285aad6be579bbe23cc9bda581 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Jun 2024 11:46:43 -0400 Subject: [PATCH 4/4] Rename state.rs to session_state.rs --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/core/src/execution/mod.rs | 2 +- datafusion/core/src/execution/{state.rs => session_state.rs} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename datafusion/core/src/execution/{state.rs => session_state.rs} (100%) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 785ec4b6c777..e247263964cd 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -64,7 +64,7 @@ use datafusion_expr::{ }; // backwards compatibility -pub use crate::execution::state::SessionState; +pub use crate::execution::session_state::SessionState; use async_trait::async_trait; use chrono::{DateTime, Utc}; diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs index 4e13d78f0d08..ac02c7317256 100644 --- a/datafusion/core/src/execution/mod.rs +++ b/datafusion/core/src/execution/mod.rs @@ -18,7 +18,7 @@ //! Shared state for query planning and execution. pub mod context; -pub mod state; +pub mod session_state; // backwards compatibility pub use crate::datasource::file_format::options; diff --git a/datafusion/core/src/execution/state.rs b/datafusion/core/src/execution/session_state.rs similarity index 100% rename from datafusion/core/src/execution/state.rs rename to datafusion/core/src/execution/session_state.rs