# Dask Lazy Evaluation and Task Graphs

In this notebook, we'll explore:

- **Lazy Evaluation Strategies**: Building computation graphs vs immediate execution
- **Task Graph Optimization**: How Dask optimizes complex operations before execution  
- **Memory-Efficient Computing**: Processing larger-than-memory datasets through intelligent partitioning
- **Distributed Aggregations**: Scaling groupby and statistical operations across multiple cores
- **Performance Debugging**: Visualizing and optimizing computation graphs

### The Dask Computational Model

Dask's power comes from its sophisticated approach to computation:

1. **Lazy Evaluation**: Operations build a task graph without immediate execution
2. **Graph Optimization**: Dask optimizes the entire computation pipeline before running
3. **Intelligent Scheduling**: Tasks are distributed efficiently across available resources  
4. **Memory Management**: Only necessary data partitions are loaded into memory
5. **Fault Tolerance**: Failed tasks can be recomputed without restarting entire workflows

### Why This Matters for HPDA

**Traditional pandas approach:**
```python
df = pd.read_csv('large_file.csv')      # Load entire dataset
result = df[df.col > 0].groupby('id').mean()  # Process in memory
```

**Dask approach:**
```python
df = dd.read_csv('large_file.csv')      # Create lazy DataFrame
result = df[df.col > 0].groupby('id').mean()  # Build computation graph
final = result.compute()                # Execute optimized graph
```

### Learning Objectives

By the end of this tutorial, you'll understand:
- When to use `.compute()` vs keeping operations lazy
- How to inspect and optimize Dask computation graphs
- Memory-efficient strategies for large-scale aggregations
- Performance debugging techniques for distributed computing

Let's explore these concepts with our meteorological dataset!

In [1]:
import pathlib
import dask.dataframe as dd
import psutil
import matplotlib.pylab as plt
%matplotlib inline

In [2]:
# print amount of available RAM memory
print(f"Available RAM: {psutil.virtual_memory().available / (1024 ** 3):.2f} GB")

Available RAM: 4.24 GB


## Data Loading

In [3]:
# using pathlib create directory called meteo
data_raw_path = pathlib.Path("meteo") / "meteo_dask.h5"
data_raw_path.exists()

True

### Loading Distributed Data with HDF5

`dd.read_hdf()` creates a lazy Dask DataFrame:

**Key Advantages of HDF5 for Dask Computing:**
- **Efficient chunking**: Data automatically partitioned for parallel processing
- **Metadata preservation**: Column types, compression info maintained
- **Fast random access**: Enables efficient filtering and subsetting operations
- **Cross-platform compatibility**: Works identically on different systems

**Memory Efficiency**: This operation uses minimal memory - no data is loaded until computation is triggered with `.compute()`!

In [4]:
df = dd.read_hdf(data_raw_path, key='df')

## Data Exploration

The `.head()` method is special in Dask - it's one of the few operations that **automatically triggers computation**:

**Why `.head()` is eager:**
- **Development workflow**: Developers need immediate feedback during exploration
- **Small result size**: First few rows fit easily in memory
- **Safety mechanism**: Prevents accidentally loading entire datasets during exploration

**Performance Insight**: `.head()` only reads enough data to return the requested rows, making it extremely efficient even on terabyte-scale datasets.

In [5]:
df.head()

Unnamed: 0,Date,Time,TempOut,TempHi,TempLow,HumOut,DewPt,WindSpeed,WindDir,WindRun,...,CoolDD,TempIn,HumIn,DewPtIn,HeatIn,ET,WindSamp,WindTx,ISSRecept,ArcInt
0,24-12-29,0:30,0.4,0.4,0.3,98,0.1,0.0,NE,0.0,...,0.0,18.4,30,0.5,16.4,0.0,699.0,1.0,100.0,30.0
1,24-12-29,1:00,0.1,0.3,-0.1,98,-0.2,0.0,NE,0.0,...,0.0,18.4,30,0.5,16.4,0.0,699.0,1.0,100.0,30.0
2,24-12-29,1:30,-0.2,-0.1,-0.3,98,-0.5,0.0,NE,0.0,...,0.0,18.3,30,0.4,16.4,0.0,699.0,1.0,100.0,30.0
3,24-12-29,2:00,-0.3,-0.3,-0.4,98,-0.6,0.0,NE,0.0,...,0.0,18.3,30,0.4,16.3,0.0,700.0,1.0,100.0,30.0
4,24-12-29,2:30,-0.5,-0.4,-0.6,98,-0.8,0.0,NE,0.0,...,0.0,18.3,30,0.4,16.3,0.0,701.0,1.0,100.0,30.0


### Dataset Size Information

`len(df)` demonstrates Dask's intelligent optimization:

**Under the hood:** Dask reads only the **metadata** from each partition to determine row counts, without loading actual data. This makes length calculation extremely fast even for massive datasets.

**Distributed Challenge**: Unlike pandas, calculating exact lengths in distributed systems requires coordination between partitions, but Dask optimizes this through metadata caching.

In [6]:
len(df)

14492

## Lazy vs Eager Evaluation: Understanding Computation Graphs

This line creates a **computation graph** without executing it:

```python
df[df.HumOut > 0].TempOut.mean()
```

**Task Graph Components:**
1. **Filter Operation**: `df.HumOut > 0` - creates boolean mask across all partitions
2. **Subsetting**: `df[...]` - applies filter to DataFrame  
3. **Column Selection**: `.TempOut` - selects specific column from filtered data
4. **Aggregation**: `.mean()` - computes mean across all partitions

**Memory Advantage**: This entire chain uses **zero additional memory** until `.compute()` is called!

### The Power of Lazy Evaluation

**Lazy operations return Dask objects:**

In [7]:
df[df.HumOut > 0].TempOut.mean()

<dask_expr.expr.Scalar: expr=((Filter(frame=ArrowStringConversion(frame=FromMapProjectable(a26c2d6)), predicate=ArrowStringConversion(frame=FromMapProjectable(a26c2d6))['HumOut'] > 0))['TempOut']).mean(), dtype=float64>

### Triggering Computation with .compute()

**Now the magic happens!** `.compute()` triggers execution of the entire task graph:

**Optimization Process:**
1. **Graph Analysis**: Dask analyzes the computation graph for optimization opportunities
2. **Task Scheduling**: Operations are distributed across available CPU cores  
3. **Predicate Pushdown**: Filtering is applied as early as possible to minimize data movement
4. **Parallel Execution**: Each partition is processed independently
5. **Result Aggregation**: Final results are combined from all partitions

**Performance Comparison:**
- **Pandas equivalent**: Would load entire dataset, then filter, then compute mean
- **Dask advantage**: Only processes relevant data, parallelizes across cores

**Memory Usage**: Notice that only the final scalar result (mean temperature) is returned to memory!

In [8]:
df[df.HumOut > 0].TempOut.mean().compute()

np.float64(12.737954592505695)

## Computation Graph Visualization and Debugging

### Visualizing Task Graphs for Performance Optimization

**Uncomment the line below to see the computation graph visualization:**

```python
df[df.HumOut > 0].TempOut.mean().visualize(engine='cytoscape')
```

**Graph Visualization Benefits:**
- **Identify bottlenecks**: Spot operations that require data shuffling between partitions
- **Optimize workflows**: Reorder operations to minimize memory usage and computation time
- **Debug performance**: Understand why certain operations are slow
- **Educational value**: Visualize how Dask breaks down complex operations

**Visualization Engines:**
- **`'cytoscape'`**: Interactive web-based visualization (requires jupyter extensions)
- **`'graphviz'`**: Static PNG/SVG output (requires graphviz installation)
- **Built-in matplotlib**: Basic visualization for simple graphs

**HPDA Pro Tip**: Always visualize computation graphs for complex operations to understand performance characteristics before running on production data!

In [9]:
#df[df.HumOut > 0].TempOut.mean().visualize(engine='cytoscape')

## Advanced Aggregation Patterns

GroupBy operations are among the most complex in distributed computing:

**Challenges with Distributed GroupBy:**
- **Data shuffling**: Groups may span multiple partitions, requiring data movement
- **Memory management**: Large groups might not fit in worker memory  
- **Load balancing**: Uneven group sizes can create worker imbalances
- **Combinable aggregations**: Some operations (like mean) require careful handling of partial results

### Humidity-Temperature Correlation Analysis

This operation computes the mean temperature for each humidity level:

```python
df.TempOut.groupby(df.HumOut).mean().compute()
```

**Behind the Scenes:**
1. **Partial aggregation**: Each partition computes sums and counts locally
2. **Data shuffling**: Partial results are combined across partitions by humidity level
3. **Final aggregation**: Final means computed from combined sums and counts
4. **Result collection**: Final results returned as pandas Series

**Performance Note**: This creates a result with potentially 100+ humidity values, which is why we use `.compute()` to materialize the full result.

In [10]:
df.TempOut.groupby(df.HumOut).mean().compute()

HumOut
-1            NaN
 23     20.900000
 24     20.550000
 25     20.233333
 26     19.750000
          ...    
 96      6.606855
 97      8.263484
 98      8.850829
 99     10.263918
 100    15.185714
Name: TempOut, Length: 78, dtype: float64

### Complex GroupBy Computation Graph

**Uncomment to visualize the GroupBy task graph:**

```python
df.TempOut.groupby(df.HumOut).mean().visualize(engine='cytoscape')
```

**What you'll see in the visualization:**
- **Multiple stages**: Partial aggregation → shuffle → final aggregation  
- **Data dependencies**: How results from different partitions are combined
- **Communication patterns**: Data movement between workers
- **Optimization opportunities**: Potential bottlenecks in the computation

**Complex GroupBy Pattern**: Notice how this graph is more intricate than simple filtering operations, showing the distributed nature of aggregation computations.

In [11]:
#df.TempOut.groupby(df.HumOut).mean().visualize(engine='cytoscape')

## Advanced Computation Challenges

### 🎯 **Challenge 1: Multi-Stage Computation Optimization**

**Objective**: Build and optimize a complex computation graph involving multiple operations.

**Task**: Create a computation that:
1. Filters data for valid temperature and humidity readings
2. Creates temperature categories (cold, mild, warm, hot)
3. Computes statistics for each category
4. Finds correlations between variables

```python
# TODO: Implement complex multi-stage computation
# Step 1: Filter valid data
# valid_data = df[(df.TempOut > -50) & (df.TempOut < 50) & (df.HumOut > 0) & (df.HumOut <= 100)]

# Step 2: Create temperature categories
# def categorize_temp(temp):
#     if temp < 0: return 'cold'
#     elif temp < 15: return 'mild'  
#     elif temp < 25: return 'warm'
#     else: return 'hot'

# Step 3: Apply categorization and compute statistics
# temp_categories = valid_data.TempOut.apply(categorize_temp, meta=('TempOut', 'object'))
# category_stats = valid_data.groupby(temp_categories).agg({
#     'TempOut': ['mean', 'std', 'count'],
#     'HumOut': ['mean', 'std'], 
#     'WindSpeed': 'mean'
# }).compute()
```

**Learning Objectives**:
- Practice chaining multiple Dask operations
- Understand when to use `.compute()` vs keeping operations lazy
- Learn about the `meta` parameter for custom functions

### 🧮 **Challenge 2: Custom Aggregation Functions**

**Objective**: Implement custom aggregation functions that work efficiently with Dask's distributed computing model.

**Task**: Create a custom function to compute the "weather comfort index" based on temperature and humidity:

```python
# TODO: Implement custom aggregation
# def comfort_index(temp_series, humidity_series):
#     """
#     Compute comfort index: higher values = more comfortable
#     Formula: 100 - abs(temp - 22) - abs(humidity - 45)
#     """
#     temp_penalty = np.abs(temp_series - 22)  # Optimal temp: 22°C
#     humidity_penalty = np.abs(humidity_series - 45)  # Optimal humidity: 45%
#     return 100 - temp_penalty - humidity_penalty

# Apply custom aggregation
# df['comfort'] = comfort_index(df.TempOut, df.HumOut) 
# monthly_comfort = df.comfort.resample('M', on='datetime_column').mean()
```

**Advanced**: Implement this as a proper Dask aggregation with proper chunking and combining functions.

### ⚡ **Challenge 3: Performance Profiling and Optimization**

**Objective**: Profile and optimize Dask computations for maximum performance.

**Tasks**:

1. **Compare operation strategies:**
```python
# TODO: Compare these approaches and measure performance
# Approach 1: Chain operations with intermediate .compute()
# result1 = df[df.HumOut > 50].compute().TempOut.mean()

# Approach 2: Keep everything lazy until final result  
# result2 = df[df.HumOut > 50].TempOut.mean().compute()
```

2. **Partition optimization:**
```python
# TODO: Experiment with different partitioning strategies
# Check current partition sizes: df.map_partitions(len).compute()
# Repartition for optimal performance: df_repart = df.repartition(partition_size="100MB")
```

3. **Memory usage profiling:**
```python
# TODO: Monitor memory usage during computation
# from dask.diagnostics import ProgressBar, ResourceProfiler
# with ResourceProfiler() as rprof:
#     with ProgressBar():
#         result = your_complex_computation.compute()
```

### 🏆 **Master Challenge: Production-Ready Analytics Pipeline**

**Objective**: Build a complete analytics pipeline demonstrating production-ready Dask patterns.

**Requirements**:
- Implement error handling for computation failures
- Create modular functions for different analysis components  
- Add progress monitoring and logging
- Include data validation steps
- Optimize for both memory usage and computation time
- Document performance characteristics and scaling behavior

**Example Pipeline Structure**:
```python
# TODO: Build complete analytics pipeline
# def validate_data(df):
#     """Validate data quality before processing"""
#     pass

# def compute_weather_statistics(df):
#     """Core analytics computation"""  
#     pass

# def export_results(results, format='hdf5'):
#     """Export results in optimized format"""
#     pass

# # Main pipeline
# pipeline = df.pipe(validate_data).pipe(compute_weather_statistics)
# results = pipeline.compute()
# export_results(results)
```

## 🎓 **Key Takeaways**

### **Computational Strategies**
1. **Lazy First**: Build entire computation graphs before triggering execution
2. **Visualization**: Always visualize complex graphs to understand performance characteristics  
3. **Memory Planning**: Reserve 2-3x expected result size for intermediate computations
4. **Strategic .compute()**: Only materialize results when necessary for downstream operations

### **Performance Optimization**
- **Predicate Pushdown**: Apply filters as early as possible in computation graphs
- **Partition Awareness**: Understand how operations interact with data partitioning
- **Custom Functions**: Use proper `meta` parameters for user-defined functions
- **Resource Monitoring**: Profile memory and CPU usage to identify bottlenecks