Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cargo run --example dataframe
- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`examples/data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`examples/query_planning/analyzer_rule.rs`](examples/query_planning/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`examples/data_io/catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog
- [`examples/data_io/json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
Expand All @@ -65,22 +65,23 @@ cargo run --example dataframe
- [`examples/builtin_functions/date_time`](examples/builtin_functions/date_time.rs): Examples of date-time related functions and queries
- [`default_column_values.rs`](examples/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`examples/query_planning/expr_api.rs`](examples/query_planning/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients
- [`examples/builtin_functions/function_factory.rs`](examples/builtin_functions/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`examples/query_planning/optimizer_rule.rs`](examples/query_planning/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`examples/data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
- [`examples/data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
- [`examples/data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
- [`examples/data_io/parquet_index.rs`](examples/data_io/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`examples/data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`planner_api.rs`](examples/planner_api.rs) APIs to manipulate logical and physical plans
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`examples/query_planning/parse_sql_expr.rs`](examples/query_planning/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`examples/query_planning/plan_to_sql.rs`](examples/query_planning/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`examples/query_planning/planner_api.rs`](examples/query_planning/planner_api.rs) APIs to manipulate logical and physical plans
- [`examples/query_planning/pruning.rs`](examples/query_planning/pruning.rs): Use pruning to rule out files based on statistics
- [`examples/query_planning/thread_pools.rs`](examples/query_planning/thread_pools.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP
- [`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ use std::sync::{Arc, Mutex};
/// level access control scheme by introducing a filter to the query.
///
/// See [optimizer_rule.rs] for an example of a optimizer rule
#[tokio::main]
pub async fn main() -> Result<()> {
pub async fn analyzer_rule() -> Result<()> {
// AnalyzerRules run before OptimizerRules.
//
// DataFusion includes several built in AnalyzerRules for tasks such as type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ use datafusion::prelude::*;
/// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`]
/// 6. Get the types of the expressions: [`expression_type_demo`]
/// 7. Apply type coercion to expressions: [`type_coercion_demo`]
#[tokio::main]
async fn main() -> Result<()> {
pub async fn expr_api() -> Result<()> {
// The easiest way to do create expressions is to use the
// "fluent"-style API:
let expr = col("a") + lit(5);
Expand Down
138 changes: 138 additions & 0 deletions datafusion-examples/examples/query_planning/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! # These are all internal mechanics of the query planning and optimization layers
//!
//! These examples demonstrate internal mechanics of the query planning and optimization layers.
//!
//! ## Usage
//! ```bash
//! cargo run --example query_planning -- [analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools]
//! ```
//!
//! Each subcommand runs a corresponding example:
//! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control)
//! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s
//! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates
//! - `parse_sql_expr` — parse SQL text into DataFusion `Expr`
//! - `plan_to_sql` — generate SQL from DataFusion `Expr` and `LogicalPlan`
//! - `planner_api` — APIs to manipulate logical and physical plans
//! - `pruning` — APIs to manipulate logical and physical plans
//! - `thread_pools` — demonstrate TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling

mod analyzer_rule;
mod expr_api;
mod optimizer_rule;
mod parse_sql_expr;
mod plan_to_sql;
mod planner_api;
mod pruning;
mod thread_pools;

use std::str::FromStr;

use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
AnalyzerRule,
ExprApi,
OptimizerRule,
ParseSqlExpr,
PlanToSql,
PlannerApi,
Pruning,
ThreadPools,
}

impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::AnalyzerRule => "analyzer_rule",
Self::ExprApi => "expr_api",
Self::OptimizerRule => "optimizer_rule",
Self::ParseSqlExpr => "parse_sql_expr",
Self::PlanToSql => "plan_to_sql",
Self::PlannerApi => "planner_api",
Self::Pruning => "pruning",
Self::ThreadPools => "thread_pools",
}
}
}

impl FromStr for ExampleKind {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s {
"analyzer_rule" => Ok(Self::AnalyzerRule),
"expr_api" => Ok(Self::ExprApi),
"optimizer_rule" => Ok(Self::OptimizerRule),
"parse_sql_expr" => Ok(Self::ParseSqlExpr),
"plan_to_sql" => Ok(Self::PlanToSql),
"planner_api" => Ok(Self::PlannerApi),
"pruning" => Ok(Self::Pruning),
"thread_pools" => Ok(Self::ThreadPools),
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
}
}
}

impl ExampleKind {
const ALL: [Self; 8] = [
Self::AnalyzerRule,
Self::ExprApi,
Self::OptimizerRule,
Self::ParseSqlExpr,
Self::PlanToSql,
Self::PlannerApi,
Self::Pruning,
Self::ThreadPools,
];

const EXAMPLE_NAME: &str = "query_planning";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
}
}

#[tokio::main]
async fn main() -> Result<()> {
let usage = format!(
"Usage: cargo run --example {} -- [{}]",
ExampleKind::EXAMPLE_NAME,
ExampleKind::variants().join("|")
);

let arg = std::env::args().nth(1).ok_or_else(|| {
eprintln!("{usage}");
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?,
ExampleKind::ExprApi => expr_api::expr_api().await?,
ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?,
ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?,
ExampleKind::PlanToSql => plan_to_sql::plan_to_sql_examples().await?,
ExampleKind::PlannerApi => planner_api::planner_api().await?,
ExampleKind::Pruning => pruning::pruning().await?,
ExampleKind::ThreadPools => thread_pools::thread_pools().await?,
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ use std::sync::Arc;
///
/// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for
/// changing plan semantics.
#[tokio::main]
pub async fn main() -> Result<()> {
pub async fn optimizer_rule() -> Result<()> {
// DataFusion includes many built in OptimizerRules for tasks such as outer
// to inner join conversion and constant folding.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ use datafusion::{
/// The code in this example shows how to:
///
/// 1. [`simple_session_context_parse_sql_expr_demo`]: Parse a simple SQL text into a logical
/// expression using a schema at [`SessionContext`].
/// expression using a schema at [`SessionContext`].
///
/// 2. [`simple_dataframe_parse_sql_expr_demo`]: Parse a simple SQL text into a logical expression
/// using a schema at [`DataFrame`].
/// using a schema at [`DataFrame`].
///
/// 3. [`query_parquet_demo`]: Query a parquet file using the parsed_sql_expr from a DataFrame.
///
/// 4. [`round_trip_parse_sql_expr_demo`]: Parse a SQL text and convert it back to SQL using [`Unparser`].

#[tokio::main]
async fn main() -> Result<()> {
pub async fn parse_sql_expr() -> Result<()> {
// See how to evaluate expressions
simple_session_context_parse_sql_expr_demo()?;
simple_dataframe_parse_sql_expr_demo().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,26 @@ use std::sync::Arc;
/// The code in this example shows how to:
///
/// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with
/// fluent API and convert to sql suitable for passing to another database
/// fluent API and convert to sql suitable for passing to another database
///
/// 2. [`simple_expr_to_pretty_sql_demo`] Create a simple expression
/// [`Exprs`] with fluent API and convert to sql without extra parentheses,
/// suitable for displaying to humans
/// [`Exprs`] with fluent API and convert to sql without extra parentheses,
/// suitable for displaying to humans
///
/// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple
/// expression [`Exprs`] with fluent API and convert to sql escaping column
/// names in MySQL style.
/// expression [`Exprs`] with fluent API and convert to sql escaping column
/// names in MySQL style.
///
/// 4. [`simple_plan_to_sql_demo`]: Create a simple logical plan using the
/// DataFrames API and convert to sql string.
/// DataFrames API and convert to sql string.
///
/// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the
/// DataFrames API and convert it back to a sql string.
/// DataFrames API and convert it back to a sql string.
///
/// 6. [`unparse_my_logical_plan_as_statement`]: Create a custom logical plan and unparse it as a statement.
///
/// 7. [`unparse_my_logical_plan_as_subquery`]: Create a custom logical plan and unparse it as a subquery.

#[tokio::main]
async fn main() -> Result<()> {
pub async fn plan_to_sql_examples() -> Result<()> {
// See how to evaluate expressions
simple_expr_to_sql_demo()?;
simple_expr_to_pretty_sql_demo()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use datafusion::prelude::*;
/// physical plan:
/// - Via the combined `create_physical_plan` API.
/// - Utilizing the analyzer, optimizer, and query planner APIs separately.
#[tokio::main]
async fn main() -> Result<()> {
pub async fn planner_api() -> Result<()> {
// Set up a DataFusion context and load a Parquet file
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::array::{ArrayRef, BooleanArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{DFSchema, ScalarValue};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
Expand All @@ -40,8 +41,7 @@ use datafusion::prelude::*;
/// one might do as part of a higher level storage engine. See
/// `parquet_index.rs` for an example that uses pruning in the context of an
/// individual query.
#[tokio::main]
async fn main() {
pub async fn pruning() -> Result<()> {
// In this example, we'll use the PruningPredicate to determine if
// the expression `x = 5 AND y = 10` can never be true based on statistics

Expand Down Expand Up @@ -69,7 +69,7 @@ async fn main() {
let predicate = create_pruning_predicate(expr, &my_catalog.schema);

// Evaluate the predicate for the three files in the catalog
let prune_results = predicate.prune(&my_catalog).unwrap();
let prune_results = predicate.prune(&my_catalog)?;
println!("Pruning results: {prune_results:?}");

// The result is a `Vec` of bool values, one for each file in the catalog
Expand All @@ -93,6 +93,8 @@ async fn main() {
false
]
);

Ok(())
}

/// A simple model catalog that has information about the three files that store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ use url::Url;
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.
#[tokio::main]
async fn main() -> Result<()> {
pub async fn thread_pools() -> Result<()> {
// The first two examples read local files. Enabling the URL table feature
// lets us treat filenames as tables in SQL.
let ctx = SessionContext::new().enable_url_table();
Expand Down Expand Up @@ -121,7 +120,7 @@ async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
// Executing the plan using this pattern intermixes any IO and CPU intensive
// work on same Runtime
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
println!("{}", pretty_format_batches(&[batch?])?);
}
Ok(())
}
Expand Down