diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f2d185c928bd..1fd6d9f48273 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -51,7 +51,7 @@ cargo run --example dataframe - [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) - [`examples/data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs): Creates a detailed secondary index that covers the contents of several parquet files - [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF) -- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control) +- [`examples/query_planning/analyzer_rule.rs`](examples/query_planning/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control) - [`examples/data_io/catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog - [`examples/data_io/json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization @@ -65,22 +65,23 @@ cargo run --example dataframe - [`examples/builtin_functions/date_time`](examples/builtin_functions/date_time.rs): Examples of date-time related functions and queries - [`default_column_values.rs`](examples/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs -- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s +- [`examples/query_planning/expr_api.rs`](examples/query_planning/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients - [`examples/builtin_functions/function_factory.rs`](examples/builtin_functions/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages - [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling -- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates +- [`examples/query_planning/optimizer_rule.rs`](examples/query_planning/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`examples/data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries - [`examples/data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion - [`examples/data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory - [`examples/data_io/parquet_index.rs`](examples/data_io/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`examples/data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution -- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. -- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` -- [`planner_api.rs`](examples/planner_api.rs) APIs to manipulate logical and physical plans -- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics +- [`examples/query_planning/parse_sql_expr.rs`](examples/query_planning/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. +- [`examples/query_planning/plan_to_sql.rs`](examples/query_planning/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` +- [`examples/query_planning/planner_api.rs`](examples/query_planning/planner_api.rs) APIs to manipulate logical and physical plans +- [`examples/query_planning/pruning.rs`](examples/query_planning/pruning.rs): Use pruning to rule out files based on statistics +- [`examples/query_planning/thread_pools.rs`](examples/query_planning/thread_pools.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling - [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 - [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP - [`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions diff --git a/datafusion-examples/examples/analyzer_rule.rs b/datafusion-examples/examples/query_planning/analyzer_rule.rs similarity index 99% rename from datafusion-examples/examples/analyzer_rule.rs rename to datafusion-examples/examples/query_planning/analyzer_rule.rs index cb81cd167a88..b6c97679cb43 100644 --- a/datafusion-examples/examples/analyzer_rule.rs +++ b/datafusion-examples/examples/query_planning/analyzer_rule.rs @@ -35,8 +35,7 @@ use std::sync::{Arc, Mutex}; /// level access control scheme by introducing a filter to the query. /// /// See [optimizer_rule.rs] for an example of a optimizer rule -#[tokio::main] -pub async fn main() -> Result<()> { +pub async fn analyzer_rule() -> Result<()> { // AnalyzerRules run before OptimizerRules. // // DataFusion includes several built in AnalyzerRules for tasks such as type diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/query_planning/expr_api.rs similarity index 99% rename from datafusion-examples/examples/expr_api.rs rename to datafusion-examples/examples/query_planning/expr_api.rs index 56f960870e58..236ac4319bb6 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/query_planning/expr_api.rs @@ -55,8 +55,7 @@ use datafusion::prelude::*; /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] /// 6. Get the types of the expressions: [`expression_type_demo`] /// 7. Apply type coercion to expressions: [`type_coercion_demo`] -#[tokio::main] -async fn main() -> Result<()> { +pub async fn expr_api() -> Result<()> { // The easiest way to do create expressions is to use the // "fluent"-style API: let expr = col("a") + lit(5); diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs new file mode 100644 index 000000000000..a2b6f0925a6c --- /dev/null +++ b/datafusion-examples/examples/query_planning/main.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These are all internal mechanics of the query planning and optimization layers +//! +//! These examples demonstrate internal mechanics of the query planning and optimization layers. +//! +//! ## Usage +//! ```bash +//! cargo run --example query_planning -- [analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control) +//! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s +//! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates +//! - `parse_sql_expr` — parse SQL text into DataFusion `Expr` +//! - `plan_to_sql` — generate SQL from DataFusion `Expr` and `LogicalPlan` +//! - `planner_api` — APIs to manipulate logical and physical plans +//! - `pruning` — APIs to manipulate logical and physical plans +//! - `thread_pools` — demonstrate TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling + +mod analyzer_rule; +mod expr_api; +mod optimizer_rule; +mod parse_sql_expr; +mod plan_to_sql; +mod planner_api; +mod pruning; +mod thread_pools; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + AnalyzerRule, + ExprApi, + OptimizerRule, + ParseSqlExpr, + PlanToSql, + PlannerApi, + Pruning, + ThreadPools, +} + +impl AsRef for ExampleKind { + fn as_ref(&self) -> &str { + match self { + Self::AnalyzerRule => "analyzer_rule", + Self::ExprApi => "expr_api", + Self::OptimizerRule => "optimizer_rule", + Self::ParseSqlExpr => "parse_sql_expr", + Self::PlanToSql => "plan_to_sql", + Self::PlannerApi => "planner_api", + Self::Pruning => "pruning", + Self::ThreadPools => "thread_pools", + } + } +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "analyzer_rule" => Ok(Self::AnalyzerRule), + "expr_api" => Ok(Self::ExprApi), + "optimizer_rule" => Ok(Self::OptimizerRule), + "parse_sql_expr" => Ok(Self::ParseSqlExpr), + "plan_to_sql" => Ok(Self::PlanToSql), + "planner_api" => Ok(Self::PlannerApi), + "pruning" => Ok(Self::Pruning), + "thread_pools" => Ok(Self::ThreadPools), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +impl ExampleKind { + const ALL: [Self; 8] = [ + Self::AnalyzerRule, + Self::ExprApi, + Self::OptimizerRule, + Self::ParseSqlExpr, + Self::PlanToSql, + Self::PlannerApi, + Self::Pruning, + Self::ThreadPools, + ]; + + const EXAMPLE_NAME: &str = "query_planning"; + + fn variants() -> Vec<&'static str> { + Self::ALL.iter().map(|x| x.as_ref()).collect() + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let usage = format!( + "Usage: cargo run --example {} -- [{}]", + ExampleKind::EXAMPLE_NAME, + ExampleKind::variants().join("|") + ); + + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("{usage}"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, + ExampleKind::ExprApi => expr_api::expr_api().await?, + ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, + ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, + ExampleKind::PlanToSql => plan_to_sql::plan_to_sql_examples().await?, + ExampleKind::PlannerApi => planner_api::planner_api().await?, + ExampleKind::Pruning => pruning::pruning().await?, + ExampleKind::ThreadPools => thread_pools::thread_pools().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/optimizer_rule.rs b/datafusion-examples/examples/query_planning/optimizer_rule.rs similarity index 99% rename from datafusion-examples/examples/optimizer_rule.rs rename to datafusion-examples/examples/query_planning/optimizer_rule.rs index 9c137b67432c..4af5ef50b3df 100644 --- a/datafusion-examples/examples/optimizer_rule.rs +++ b/datafusion-examples/examples/query_planning/optimizer_rule.rs @@ -37,8 +37,7 @@ use std::sync::Arc; /// /// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for /// changing plan semantics. -#[tokio::main] -pub async fn main() -> Result<()> { +pub async fn optimizer_rule() -> Result<()> { // DataFusion includes many built in OptimizerRules for tasks such as outer // to inner join conversion and constant folding. // diff --git a/datafusion-examples/examples/parse_sql_expr.rs b/datafusion-examples/examples/query_planning/parse_sql_expr.rs similarity index 97% rename from datafusion-examples/examples/parse_sql_expr.rs rename to datafusion-examples/examples/query_planning/parse_sql_expr.rs index 5387e7c4a05d..44e6b3cf5f67 100644 --- a/datafusion-examples/examples/parse_sql_expr.rs +++ b/datafusion-examples/examples/query_planning/parse_sql_expr.rs @@ -32,17 +32,15 @@ use datafusion::{ /// The code in this example shows how to: /// /// 1. [`simple_session_context_parse_sql_expr_demo`]: Parse a simple SQL text into a logical -/// expression using a schema at [`SessionContext`]. +/// expression using a schema at [`SessionContext`]. /// /// 2. [`simple_dataframe_parse_sql_expr_demo`]: Parse a simple SQL text into a logical expression -/// using a schema at [`DataFrame`]. +/// using a schema at [`DataFrame`]. /// /// 3. [`query_parquet_demo`]: Query a parquet file using the parsed_sql_expr from a DataFrame. /// /// 4. [`round_trip_parse_sql_expr_demo`]: Parse a SQL text and convert it back to SQL using [`Unparser`]. - -#[tokio::main] -async fn main() -> Result<()> { +pub async fn parse_sql_expr() -> Result<()> { // See how to evaluate expressions simple_session_context_parse_sql_expr_demo()?; simple_dataframe_parse_sql_expr_demo().await?; diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/query_planning/plan_to_sql.rs similarity index 96% rename from datafusion-examples/examples/plan_to_sql.rs rename to datafusion-examples/examples/query_planning/plan_to_sql.rs index 54483b143a16..f23f083acd4a 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/query_planning/plan_to_sql.rs @@ -43,28 +43,26 @@ use std::sync::Arc; /// The code in this example shows how to: /// /// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with -/// fluent API and convert to sql suitable for passing to another database +/// fluent API and convert to sql suitable for passing to another database /// /// 2. [`simple_expr_to_pretty_sql_demo`] Create a simple expression -/// [`Exprs`] with fluent API and convert to sql without extra parentheses, -/// suitable for displaying to humans +/// [`Exprs`] with fluent API and convert to sql without extra parentheses, +/// suitable for displaying to humans /// /// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple -/// expression [`Exprs`] with fluent API and convert to sql escaping column -/// names in MySQL style. +/// expression [`Exprs`] with fluent API and convert to sql escaping column +/// names in MySQL style. /// /// 4. [`simple_plan_to_sql_demo`]: Create a simple logical plan using the -/// DataFrames API and convert to sql string. +/// DataFrames API and convert to sql string. /// /// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the -/// DataFrames API and convert it back to a sql string. +/// DataFrames API and convert it back to a sql string. /// /// 6. [`unparse_my_logical_plan_as_statement`]: Create a custom logical plan and unparse it as a statement. /// /// 7. [`unparse_my_logical_plan_as_subquery`]: Create a custom logical plan and unparse it as a subquery. - -#[tokio::main] -async fn main() -> Result<()> { +pub async fn plan_to_sql_examples() -> Result<()> { // See how to evaluate expressions simple_expr_to_sql_demo()?; simple_expr_to_pretty_sql_demo()?; diff --git a/datafusion-examples/examples/planner_api.rs b/datafusion-examples/examples/query_planning/planner_api.rs similarity index 99% rename from datafusion-examples/examples/planner_api.rs rename to datafusion-examples/examples/query_planning/planner_api.rs index 55aec7b0108a..dd3643471ead 100644 --- a/datafusion-examples/examples/planner_api.rs +++ b/datafusion-examples/examples/query_planning/planner_api.rs @@ -32,8 +32,7 @@ use datafusion::prelude::*; /// physical plan: /// - Via the combined `create_physical_plan` API. /// - Utilizing the analyzer, optimizer, and query planner APIs separately. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn planner_api() -> Result<()> { // Set up a DataFusion context and load a Parquet file let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/query_planning/pruning.rs similarity index 98% rename from datafusion-examples/examples/pruning.rs rename to datafusion-examples/examples/query_planning/pruning.rs index 9a61789662cd..7c42d151c05b 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/query_planning/pruning.rs @@ -22,6 +22,7 @@ use arrow::array::{ArrayRef, BooleanArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::pruning::PruningStatistics; use datafusion::common::{DFSchema, ScalarValue}; +use datafusion::error::Result; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_optimizer::pruning::PruningPredicate; @@ -40,8 +41,7 @@ use datafusion::prelude::*; /// one might do as part of a higher level storage engine. See /// `parquet_index.rs` for an example that uses pruning in the context of an /// individual query. -#[tokio::main] -async fn main() { +pub async fn pruning() -> Result<()> { // In this example, we'll use the PruningPredicate to determine if // the expression `x = 5 AND y = 10` can never be true based on statistics @@ -69,7 +69,7 @@ async fn main() { let predicate = create_pruning_predicate(expr, &my_catalog.schema); // Evaluate the predicate for the three files in the catalog - let prune_results = predicate.prune(&my_catalog).unwrap(); + let prune_results = predicate.prune(&my_catalog)?; println!("Pruning results: {prune_results:?}"); // The result is a `Vec` of bool values, one for each file in the catalog @@ -93,6 +93,8 @@ async fn main() { false ] ); + + Ok(()) } /// A simple model catalog that has information about the three files that store diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/query_planning/thread_pools.rs similarity index 99% rename from datafusion-examples/examples/thread_pools.rs rename to datafusion-examples/examples/query_planning/thread_pools.rs index 9842cccfbfe8..768638d11a1d 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/query_planning/thread_pools.rs @@ -64,8 +64,7 @@ use url::Url; /// when using Rust libraries such as `tonic`. Using a separate `Runtime` for /// CPU bound tasks will often be simpler in larger applications, even though it /// makes this example slightly more complex. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn thread_pools() -> Result<()> { // The first two examples read local files. Enabling the URL table feature // lets us treat filenames as tables in SQL. let ctx = SessionContext::new().enable_url_table(); @@ -121,7 +120,7 @@ async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { // Executing the plan using this pattern intermixes any IO and CPU intensive // work on same Runtime while let Some(batch) = stream.next().await { - println!("{}", pretty_format_batches(&[batch?]).unwrap()); + println!("{}", pretty_format_batches(&[batch?])?); } Ok(()) }