Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

This is a fork of Apache Arrow DataFusion, an extensible query execution framework written in Rust that uses Apache Arrow as its in-memory format. This fork is maintained by Cube and includes custom extensions and optimizations.

## Key Commands

### Building
```bash
cargo build # Build the project
cargo build --release # Build with optimizations
cargo build -p datafusion # Build specific package
```

### Testing
```bash
# Setup test data (required before first test run)
git submodule init
git submodule update
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data/
export ARROW_TEST_DATA=$(pwd)/testing/data/

# Run tests
cargo test # Run all tests
cargo test -p datafusion # Test specific package
cargo test test_name # Run specific test
cargo test -- --nocapture # Show println! output during tests
```

### Formatting and Linting
```bash
cargo fmt # Format code
cargo fmt --check # Check formatting without changes
cargo clippy # Run linter
```

### Benchmarks
```bash
cargo bench # Run benchmarks
cargo bench -p datafusion # Run datafusion benchmarks
```

## Architecture Overview

### Core Components

1. **Logical Planning** (`datafusion/src/logical_plan/`)
- `LogicalPlan`: Represents logical query plans (SELECT, JOIN, etc.)
- `Expr`: Expression trees for filters, projections, aggregations
- `DFSchema`: Schema representation with field metadata
- SQL parsing and planning in `datafusion/src/sql/`

2. **Physical Planning** (`datafusion/src/physical_plan/`)
- `ExecutionPlan`: Physical execution operators
- Expression implementations for actual computation
- Aggregate functions with `Accumulator` trait
- Custom operators for hash joins, sorts, aggregations

3. **Execution** (`datafusion/src/execution/`)
- `ExecutionContext`: Main entry point for query execution
- DataFrame API for programmatic query building
- Manages memory, concurrency, and resource limits

4. **Optimizer** (`datafusion/src/optimizer/`)
- Rule-based optimizer with passes like:
- Predicate pushdown
- Projection pushdown
- Constant folding
- Join reordering

5. **Cube Extensions** (`datafusion/src/cube_ext/`)
- Custom operators and functions specific to Cube's fork
- Performance optimizations including:
- `GroupsAccumulator` for efficient grouped aggregation
- `GroupsAccumulatorFlatAdapter` for flattened group values
- Specialized join and aggregation implementations

### Key Design Patterns

- **Visitor Pattern**: Used extensively for traversing and transforming plans
- **Async/Await**: All execution is async using Tokio runtime
- **Arrow Arrays**: All data processing uses Arrow columnar format
- **Stream Processing**: Results are produced as async streams of RecordBatches

### Adding New Functionality

**Scalar Functions**: Implement in appropriate module under `physical_plan/`, register in `physical_plan/functions.rs`

**Aggregate Functions**: Create `Accumulator` implementation, register in `physical_plan/aggregates.rs`

**Optimizer Rules**: Implement `OptimizerRule` trait, add to optimizer pipeline

**Physical Operators**: Implement `ExecutionPlan` trait with proper partitioning and execution

## Important Notes

- This is a Cube fork with custom modifications - Ballista components are disabled
- The `cube_ext` module contains Cube-specific extensions and optimizations
- Performance-critical paths often have specialized implementations for primitive types
- Always run tests with proper test data environment variables set
20 changes: 16 additions & 4 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,14 @@ pub trait ExtensionPlanner {
/// Default single node physical query planner that converts a
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
pub struct DefaultPhysicalPlanner {
should_evaluate_constants: bool,
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}

impl Default for DefaultPhysicalPlanner {
fn default() -> Self {
Self {
should_evaluate_constants: true,
extension_planners: vec![
Arc::new(LogicalAliasPlanner {}),
Arc::new(CrossJoinPlanner {}),
Expand All @@ -265,6 +267,15 @@ impl Default for DefaultPhysicalPlanner {
}
}

impl DefaultPhysicalPlanner {
pub fn disable_constant_evaluation(self) -> Self {
let mut mv = self;
mv.should_evaluate_constants = false;

mv
}
}

impl PhysicalPlanner for DefaultPhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
Expand Down Expand Up @@ -334,7 +345,7 @@ impl DefaultPhysicalPlanner {
extension_planners.insert(1, Arc::new(CrossJoinPlanner {}));
extension_planners.insert(2, Arc::new(CrossJoinAggPlanner {}));
extension_planners.insert(3, Arc::new(crate::cube_ext::rolling::Planner {}));
Self { extension_planners }
Self { should_evaluate_constants: true, extension_planners }
}

/// Create a physical plan from a logical plan
Expand Down Expand Up @@ -1360,9 +1371,10 @@ impl DefaultPhysicalPlanner {
res_expr: Arc<dyn PhysicalExpr>,
inputs: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
if inputs
.iter()
.all(|i| i.as_any().downcast_ref::<Literal>().is_some())
if self.should_evaluate_constants
&& inputs
.iter()
.all(|i| i.as_any().downcast_ref::<Literal>().is_some())
{
Ok(evaluate_const(res_expr)?)
} else {
Expand Down
Loading