diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000000..2f69e81b8b64 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,103 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This is a fork of Apache Arrow DataFusion, an extensible query execution framework written in Rust that uses Apache Arrow as its in-memory format. This fork is maintained by Cube and includes custom extensions and optimizations. + +## Key Commands + +### Building +```bash +cargo build # Build the project +cargo build --release # Build with optimizations +cargo build -p datafusion # Build specific package +``` + +### Testing +```bash +# Setup test data (required before first test run) +git submodule init +git submodule update +export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data/ +export ARROW_TEST_DATA=$(pwd)/testing/data/ + +# Run tests +cargo test # Run all tests +cargo test -p datafusion # Test specific package +cargo test test_name # Run specific test +cargo test -- --nocapture # Show println! output during tests +``` + +### Formatting and Linting +```bash +cargo fmt # Format code +cargo fmt --check # Check formatting without changes +cargo clippy # Run linter +``` + +### Benchmarks +```bash +cargo bench # Run benchmarks +cargo bench -p datafusion # Run datafusion benchmarks +``` + +## Architecture Overview + +### Core Components + +1. **Logical Planning** (`datafusion/src/logical_plan/`) + - `LogicalPlan`: Represents logical query plans (SELECT, JOIN, etc.) + - `Expr`: Expression trees for filters, projections, aggregations + - `DFSchema`: Schema representation with field metadata + - SQL parsing and planning in `datafusion/src/sql/` + +2. **Physical Planning** (`datafusion/src/physical_plan/`) + - `ExecutionPlan`: Physical execution operators + - Expression implementations for actual computation + - Aggregate functions with `Accumulator` trait + - Custom operators for hash joins, sorts, aggregations + +3. **Execution** (`datafusion/src/execution/`) + - `ExecutionContext`: Main entry point for query execution + - DataFrame API for programmatic query building + - Manages memory, concurrency, and resource limits + +4. **Optimizer** (`datafusion/src/optimizer/`) + - Rule-based optimizer with passes like: + - Predicate pushdown + - Projection pushdown + - Constant folding + - Join reordering + +5. **Cube Extensions** (`datafusion/src/cube_ext/`) + - Custom operators and functions specific to Cube's fork + - Performance optimizations including: + - `GroupsAccumulator` for efficient grouped aggregation + - `GroupsAccumulatorFlatAdapter` for flattened group values + - Specialized join and aggregation implementations + +### Key Design Patterns + +- **Visitor Pattern**: Used extensively for traversing and transforming plans +- **Async/Await**: All execution is async using Tokio runtime +- **Arrow Arrays**: All data processing uses Arrow columnar format +- **Stream Processing**: Results are produced as async streams of RecordBatches + +### Adding New Functionality + +**Scalar Functions**: Implement in appropriate module under `physical_plan/`, register in `physical_plan/functions.rs` + +**Aggregate Functions**: Create `Accumulator` implementation, register in `physical_plan/aggregates.rs` + +**Optimizer Rules**: Implement `OptimizerRule` trait, add to optimizer pipeline + +**Physical Operators**: Implement `ExecutionPlan` trait with proper partitioning and execution + +## Important Notes + +- This is a Cube fork with custom modifications - Ballista components are disabled +- The `cube_ext` module contains Cube-specific extensions and optimizations +- Performance-critical paths often have specialized implementations for primitive types +- Always run tests with proper test data environment variables set \ No newline at end of file diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 3ef9ac689e0f..fe163aa02643 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -249,12 +249,14 @@ pub trait ExtensionPlanner { /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. pub struct DefaultPhysicalPlanner { + should_evaluate_constants: bool, extension_planners: Vec>, } impl Default for DefaultPhysicalPlanner { fn default() -> Self { Self { + should_evaluate_constants: true, extension_planners: vec![ Arc::new(LogicalAliasPlanner {}), Arc::new(CrossJoinPlanner {}), @@ -265,6 +267,15 @@ impl Default for DefaultPhysicalPlanner { } } +impl DefaultPhysicalPlanner { + pub fn disable_constant_evaluation(self) -> Self { + let mut mv = self; + mv.should_evaluate_constants = false; + + mv + } +} + impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical plan from a logical plan fn create_physical_plan( @@ -334,7 +345,7 @@ impl DefaultPhysicalPlanner { extension_planners.insert(1, Arc::new(CrossJoinPlanner {})); extension_planners.insert(2, Arc::new(CrossJoinAggPlanner {})); extension_planners.insert(3, Arc::new(crate::cube_ext::rolling::Planner {})); - Self { extension_planners } + Self { should_evaluate_constants: true, extension_planners } } /// Create a physical plan from a logical plan @@ -1360,9 +1371,10 @@ impl DefaultPhysicalPlanner { res_expr: Arc, inputs: Vec>, ) -> Result> { - if inputs - .iter() - .all(|i| i.as_any().downcast_ref::().is_some()) + if self.should_evaluate_constants + && inputs + .iter() + .all(|i| i.as_any().downcast_ref::().is_some()) { Ok(evaluate_const(res_expr)?) } else {