# üöÄ Advanced Data Science Workshop: Intake + Dask Integration

## Modern Python Workflow for Large-Scale Data Processing

Welcome to the advanced workshop! We'll explore how **Intake** and **Dask** work together to create a powerful data science pipeline.

### üéØ Learning Objectives
1. **Data Cataloging**: Unified data access with Intake
2. **Parallel Computing**: High-performance processing with Dask
3. **Integration Workflow**: Seamless Intake + Dask pipeline
4. **Real-world Applications**: Taiwan radar data analysis (553GB dataset)

### üìä Dataset Overview
- **Taiwan Radar Data**: 553GB, 2013-2023, 10-minute intervals
- **MaxDBZ**: Maximum radar reflectivity measurements
- **Format**: Zarr (cloud-optimized arrays)
- **Challenge**: Processing decade-scale meteorological data

## 1. Environment Setup and Data Catalog Loading

First, let's set up our environment and load the data catalogs we'll use throughout this workshop.

### üîß Required Libraries
- **intake**: Data cataloging system
- **xarray**: N-dimensional labeled arrays
- **dask**: Parallel computing library
- **matplotlib**: Visualization

In [20]:
# üöÄ Environment Setup
import intake
import xarray as xr
import dask
import dask.array as da
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
import zarr
import time



print("üîß Library Versions:")
print("=" * 50)
print(f"xarray: {xr.__version__}")
print(f"dask: {dask.__version__}")
print(f"intake: {intake.__version__}")
print(f"matplotlib: {plt.matplotlib.__version__}")
print(f"zarr: {zarr.__version__}")

üîß Library Versions:
xarray: 2025.7.1
dask: 2025.7.0
intake: 2.0.8
matplotlib: 3.10.5
zarr: 2.18.7


## 2. Data Catalog Integration and Loading

Let's load our data catalogs and explore the available datasets. We'll work with multiple catalogs to demonstrate real-world data integration scenarios.

In [4]:
import intake 
# Load the radar data catalog
catalog = intake.open_catalog('../catalogs/radar_intake_catalog.yaml')

# Explore what's in the catalog
print("üìÅ Available datasets in catalog:")
print("-" * 40)
for name in catalog:
    print(f"  ‚Ä¢ {name}")
    
print(f"\nüìä Total datasets: {len(list(catalog))}")

# Let's look at the catalog object itself
print(f"\nCatalog type: {type(catalog)}")
print(f"Catalog path: {catalog.path}")

üìÅ Available datasets in catalog:
----------------------------------------
  ‚Ä¢ QPSUMS_tw

üìä Total datasets: 1

Catalog type: <class 'intake.catalog.local.YAMLFileCatalog'>
Catalog path: ../catalogs/radar_intake_catalog.yaml


In [5]:
radar_tw_ds = catalog.QPSUMS_tw.to_dask()

print(radar_tw_ds)

radar_tw_ds

<xarray.Dataset> Size: 553GB
Dimensions:    (time: 558420, latitude: 561, longitude: 441)
Coordinates:
  * latitude   (latitude) float64 4kB 20.0 20.01 20.02 ... 26.98 26.99 27.0
  * longitude  (longitude) float64 4kB 118.0 118.0 118.0 ... 123.5 123.5 123.5
  * time       (time) datetime64[ns] 4MB 2013-01-01 ... 2023-08-31T23:50:00
Data variables:
    MaxDBZ     (time, latitude, longitude) float32 553GB dask.array<chunksize=(1, 561, 441), meta=np.ndarray>


Unnamed: 0,Array,Chunk
Bytes,514.66 GiB,0.94 MiB
Shape,"(558420, 561, 441)","(1, 561, 441)"
Dask graph,558420 chunks in 23 graph layers,558420 chunks in 23 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 514.66 GiB 0.94 MiB Shape (558420, 561, 441) (1, 561, 441) Dask graph 558420 chunks in 23 graph layers Data type float32 numpy.ndarray",441  561  558420,

Unnamed: 0,Array,Chunk
Bytes,514.66 GiB,0.94 MiB
Shape,"(558420, 561, 441)","(1, 561, 441)"
Dask graph,558420 chunks in 23 graph layers,558420 chunks in 23 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## 3. Dask Deep Integration: High-Performance Large Data Processing

Now we'll explore Dask's powerful capabilities for handling large datasets efficiently.

### üß† Key Dask Concepts
- **Lazy Evaluation**: Operations build a computation graph without immediate execution
- **Chunking**: Data is split into manageable pieces for parallel processing
- **Task Scheduling**: Intelligent optimization of computation workflows
- **Memory Management**: Process data larger than available RAM

### üì¶ Chunking Strategy
Dask automatically chunks large arrays, but understanding the chunking strategy is crucial for optimization.

In [6]:
print(radar_tw_ds)

# After loading, explicitly point out the lazy nature
print("\n" + "="*50)
print("üß† NOTICE: The data is NOT loaded into memory yet!")
print("The object above shows the data's structure (dimensions, coordinates),")
print("but the actual data values are represented by Dask arrays.")
print("Look for 'dask.array<chunksize=(...>' in the output.")
print("="*50)

<xarray.Dataset> Size: 553GB
Dimensions:    (time: 558420, latitude: 561, longitude: 441)
Coordinates:
  * latitude   (latitude) float64 4kB 20.0 20.01 20.02 ... 26.98 26.99 27.0
  * longitude  (longitude) float64 4kB 118.0 118.0 118.0 ... 123.5 123.5 123.5
  * time       (time) datetime64[ns] 4MB 2013-01-01 ... 2023-08-31T23:50:00
Data variables:
    MaxDBZ     (time, latitude, longitude) float32 553GB dask.array<chunksize=(1, 561, 441), meta=np.ndarray>

üß† NOTICE: The data is NOT loaded into memory yet!
The object above shows the data's structure (dimensions, coordinates),
but the actual data values are represented by Dask arrays.
Look for 'dask.array<chunksize=(...>' in the output.


In [7]:
# üîç Explore Dask Array Structure
print("üîç Dask Array Analysis:")
print("=" * 50)

# Examine MaxDBZ variable's Dask array
maxdbz_dask = radar_tw_ds.MaxDBZ.data
print(f"üìä Array type: {type(maxdbz_dask)}")
print(f"üìè Array shape: {maxdbz_dask.shape}")
print(f"üß© Chunk structure: {maxdbz_dask.chunks}")
print(f"üíæ Chunk size: {maxdbz_dask.chunksize}")
print(f"üî¢ Total chunks: {maxdbz_dask.npartitions}")

# Calculate theoretical memory usage
total_size_gb = radar_tw_ds.MaxDBZ.nbytes / 1e9
chunk_size_mb = (maxdbz_dask.chunksize[0] * maxdbz_dask.chunksize[1] * maxdbz_dask.chunksize[2] * 4) / 1e6  # 4 bytes per float32

print(f"\nüíæ Memory Information:")
print(f"  ‚Ä¢ Total data size: {total_size_gb:.1f} GB")
print(f"  ‚Ä¢ Single chunk size: ~{chunk_size_mb:.1f} MB")
print(f"  ‚Ä¢ Chunking strategy: Time-based chunking (one chunk per time point)")

print(f"\n‚ö° Dask Advantages:")
print(f"  ‚Ä¢ Only load necessary chunks into memory")
print(f"  ‚Ä¢ Parallel processing across multiple chunks")
print(f"  ‚Ä¢ Memory usage << Total data size")

üîç Dask Array Analysis:
üìä Array type: <class 'dask.array.core.Array'>
üìè Array shape: (558420, 561, 441)
üß© Chunk structure: ((1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

In [8]:
# üß© Smart Chunking Strategy
print("üß© Chunking Strategy Optimization:")
print("=" * 50)

# Display current chunking strategy
current_chunks = radar_tw_ds.MaxDBZ.chunks
print(f"Current chunking: {current_chunks}")

# Calculate recommended chunk sizes
time_points = radar_tw_ds.MaxDBZ.shape[0]
y_size = radar_tw_ds.MaxDBZ.shape[1] 
x_size = radar_tw_ds.MaxDBZ.shape[2]

print(f"\nüìè Data dimensions:")
print(f"  ‚Ä¢ Time points: {time_points:,}")
print(f"  ‚Ä¢ Y grid: {y_size}")
print(f"  ‚Ä¢ X grid: {x_size}")

# Demonstrate different chunking strategies
print(f"\nüéØ Chunking strategy comparison:")

# Strategy 1: Default chunking (by time)
chunk_1_size_mb = (1 * y_size * x_size * 4) / 1e6
print(f"  ‚Ä¢ Default (1, {y_size}, {x_size}): ~{chunk_1_size_mb:.1f} MB/chunk")

# Strategy 2: Larger time chunks
chunk_10_size_mb = (10 * y_size * x_size * 4) / 1e6  
print(f"  ‚Ä¢ Time batches (10, {y_size}, {x_size}): ~{chunk_10_size_mb:.1f} MB/chunk")

# Strategy 3: Spatial chunking
spatial_chunk_size = y_size // 2
chunk_spatial_size_mb = (1 * spatial_chunk_size * spatial_chunk_size * 4) / 1e6
print(f"  ‚Ä¢ Spatial chunking (1, {spatial_chunk_size}, {spatial_chunk_size}): ~{chunk_spatial_size_mb:.1f} MB/chunk")

print(f"\nüí° Chunking strategy recommendations:")
print(f"  ‚Ä¢ Small chunks (< 10MB): Suitable for memory-constrained environments")
print(f"  ‚Ä¢ Medium chunks (10-100MB): Balance performance and memory")  
print(f"  ‚Ä¢ Large chunks (> 100MB): Suitable for high-memory, high-performance computing")

üß© Chunking Strategy Optimization:
Current chunking: ((1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1

In [29]:
print("Subsetting Data for Interactive Analysis")
print("=" * 50)
print("For a live demo, we don't want to wait for computations on 553GB.")
print("Let's select just one month of data to test our workflow.\n")

daily_ds = radar_tw_ds.sel(time='2023-08-01')

# Now, apply the .where() condition on the result of the first step.
# This keeps all values where MaxDBZ is >= 0, and turns the rest into NaN.
interactive_ds = daily_ds.where(daily_ds.MaxDBZ >= 0)

print(interactive_ds)

Subsetting Data for Interactive Analysis
For a live demo, we don't want to wait for computations on 553GB.
Let's select just one month of data to test our workflow.

<xarray.Dataset> Size: 143MB
Dimensions:    (time: 144, latitude: 561, longitude: 441)
Coordinates:
  * latitude   (latitude) float64 4kB 20.0 20.01 20.02 ... 26.98 26.99 27.0
  * longitude  (longitude) float64 4kB 118.0 118.0 118.0 ... 123.5 123.5 123.5
  * time       (time) datetime64[ns] 1kB 2023-08-01 ... 2023-08-01T23:50:00
Data variables:
    MaxDBZ     (time, latitude, longitude) float32 143MB dask.array<chunksize=(1, 561, 441), meta=np.ndarray>


In [26]:
lazy_mean = interactive_ds.MaxDBZ.mean() # Use interactive_ds
lazy_max = interactive_ds.MaxDBZ.max()  # Use interactive_ds
lazy_std = interactive_ds.MaxDBZ.std()    # Use interactive_ds


compute_start = time.time()
results = da.compute(lazy_mean, lazy_max, lazy_std)
mean_result, max_result, std_result = results

compute_time = time.time() - compute_start

# ======================================================
# ‚ú® Áî®Êõ¥Ê∏ÖÊô∞ÁöÑÊñπÂºèÂëàÁèæÁµêÊûú ‚ú®
# ======================================================
print(f"\n‚úÖ Ë®àÁÆóÂÆåÊàêÔºÅ")
print(f"‚è±Ô∏è ÂØ¶ÈöõË®àÁÆóËÄóÊôÇ: {compute_time:.2f} Áßí")

print("\n" + "="*50)
print("üìä ÂàÜÊûêÁµêÊûúÂ†±Âëä")
print("="*50)
print(f"  ‚Ä¢ Âπ≥ÂùáÈõ∑ÈÅîÂõûÊ≥¢Âº∑Â∫¶ (Mean): {mean_result:.2f} dBZ")
print(f"  ‚Ä¢ ÊúÄÂ§ßÈõ∑ÈÅîÂõûÊ≥¢Âº∑Â∫¶ (Maximum): {max_result:.2f} dBZ") 
print(f"  ‚Ä¢ Èõ∑ÈÅîÂõûÊ≥¢Ê®ôÊ∫ñÂ∑Æ (Std Dev): {std_result:.2f} dBZ")
print("="*50)


‚úÖ Ë®àÁÆóÂÆåÊàêÔºÅ
‚è±Ô∏è ÂØ¶ÈöõË®àÁÆóËÄóÊôÇ: 13.87 Áßí

üìä ÂàÜÊûêÁµêÊûúÂ†±Âëä
  ‚Ä¢ Âπ≥ÂùáÈõ∑ÈÅîÂõûÊ≥¢Âº∑Â∫¶ (Mean): 18.85 dBZ
  ‚Ä¢ ÊúÄÂ§ßÈõ∑ÈÅîÂõûÊ≥¢Âº∑Â∫¶ (Maximum): 76.00 dBZ
  ‚Ä¢ Èõ∑ÈÅîÂõûÊ≥¢Ê®ôÊ∫ñÂ∑Æ (Std Dev): 10.15 dBZ


In [30]:
# --- 2. Method A: With Dask (Parallel and Lazy) ---
print("üöÄ Method A: Running with Dask...")
dask_start_time = time.time()

# Define lazy computations
lazy_mean = interactive_ds.MaxDBZ.mean()
lazy_max = interactive_ds.MaxDBZ.max()
lazy_std = interactive_ds.MaxDBZ.std()

# This single compute call triggers Dask to read data and calculate all stats
dask_results = da.compute(lazy_mean, lazy_max, lazy_std)

dask_total_time = time.time() - dask_start_time
print(f"‚úÖ Dask computation finished.")


# --- 3. Method B: Without Dask (Load into Memory First) ---
print("\nüß† Method B: Running without Dask (In-Memory)...")
total_in_memory_start_time = time.time()

# Step 1: Explicitly load all the data into memory. This is the key difference.
print("   - Step 1: Loading data into memory...")
load_start_time = time.time()
in_memory_data_array = interactive_ds.MaxDBZ.load()
load_time = time.time() - load_start_time
print(f"   - Step 1 finished in {load_time:.2f} seconds.")

# Step 2: Now perform computations using NumPy/Xarray's in-memory functions
print("   - Step 2: Performing computations on in-memory data...")
compute_start_time = time.time()
mean_np = in_memory_data_array.mean()
max_np = in_memory_data_array.max()
std_np = in_memory_data_array.std()
compute_time = time.time() - compute_start_time
print(f"   - Step 2 finished in {compute_time:.2f} seconds.")

in_memory_total_time = time.time() - total_in_memory_start_time
print(f"‚úÖ In-memory computation finished.")


# --- 4. Results and Interpretation ---
print("\n" + "="*60)
print("üìä PERFORMANCE COMPARISON")
print("="*60)
print(f"  ‚Ä¢ With Dask (Total Time):    {dask_total_time:.2f} seconds")
print(f"  ‚Ä¢ Without Dask (Total Time): {in_memory_total_time:.2f} seconds")
print(f"    (Breakdown: Load Data {load_time:.2f}s + Compute {compute_time:.2f}s)")
print("="*60)

üöÄ Method A: Running with Dask...
‚úÖ Dask computation finished.

üß† Method B: Running without Dask (In-Memory)...
   - Step 1: Loading data into memory...
   - Step 1 finished in 11.77 seconds.
   - Step 2: Performing computations on in-memory data...
   - Step 2 finished in 0.41 seconds.
‚úÖ In-memory computation finished.

üìä PERFORMANCE COMPARISON
  ‚Ä¢ With Dask (Total Time):    12.58 seconds
  ‚Ä¢ Without Dask (Total Time): 12.18 seconds
    (Breakdown: Load Data 11.77s + Compute 0.41s)


In [None]:
print("‚úçÔ∏è Adding Metadata and Saving Results")
print("=" * 50)

from datetime import datetime

# We'll use the 'lazy_mean' from the previous step, but calculate it over the whole space
lazy_spatial_mean = interactive_ds.MaxDBZ.mean(dim=['latitude', 'longitude'])

# Add descriptive attributes
lazy_spatial_mean.attrs['long_name'] = 'Domain-Averaged Maximum Radar Reflectivity'
lazy_spatial_mean.attrs['units'] = 'dBZ'
lazy_spatial_mean.attrs['processing_step'] = 'Calculated monthly mean over Taiwan domain.'
lazy_spatial_mean.attrs['history'] = f'Created on {datetime.utcnow().isoformat()}Z'

print("‚úÖ Metadata added to the lazy object.")

# Save the result to a new Zarr store. This is another action that triggers computation!
print("\nüíæ Saving data to new Zarr store... This will trigger the Dask computation.")
output_path = './tw_radar_spatial_mean_2023_08.zarr'
lazy_spatial_mean.to_zarr(output_path, mode='w')

print(f"\nüéâ Success! Results saved to: {output_path}")

# You can now load this small result instantly
saved_result = xr.open_zarr(output_path)
print("\nReloaded result:")
print(saved_result)

‚úçÔ∏è Adding Metadata and Saving Results
‚úÖ Metadata added to the lazy object.

üíæ Saving data to new Zarr store... This will trigger the Dask computation.

üéâ Success! Results saved to: ./tw_radar_spatial_mean_2023_08.zarr

Reloaded result:
<xarray.Dataset> Size: 2kB
Dimensions:  (time: 144)
Coordinates:
  * time     (time) datetime64[ns] 1kB 2023-08-01 ... 2023-08-01T23:50:00
Data variables:
    MaxDBZ   (time) float32 576B dask.array<chunksize=(1,), meta=np.ndarray>


In [31]:
saved_result

Unnamed: 0,Array,Chunk
Bytes,576 B,4 B
Shape,"(144,)","(1,)"
Dask graph,144 chunks in 2 graph layers,144 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 576 B 4 B Shape (144,) (1,) Dask graph 144 chunks in 2 graph layers Data type float32 numpy.ndarray",144  1,

Unnamed: 0,Array,Chunk
Bytes,576 B,4 B
Shape,"(144,)","(1,)"
Dask graph,144 chunks in 2 graph layers,144 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## 4. Parallel Computing in Action: Dask Cluster Monitoring

Dask's true power lies in parallel computing. Let's create a local cluster and monitor the computation process.

### üñ•Ô∏è Dask Dashboard
Dask provides a real-time web interface to monitor:
- Task execution status
- Memory usage
- Worker node status
- Computation graph visualization

### ‚ö° Parallel Processing Strategy
- **Compute vs Persist**: When to calculate, when to cache
- **Task Scheduling**: How to optimize task distribution
- **Memory Management**: Avoiding out-of-memory errors

In [9]:
# üöÄ Create Local Dask Cluster
from dask.distributed import Client, LocalCluster
import os

print("üèóÔ∏è Creating Dask Local Cluster:")
print("=" * 50)

# Detect system resources
cpu_count = os.cpu_count()
print(f"üñ•Ô∏è System CPU cores: {cpu_count}")

# Create local cluster (use fewer workers to save memory)
n_workers = min(2, cpu_count // 2)  # Conservative number of workers
memory_limit = '2GB'  # Memory limit per worker

print(f"‚öôÔ∏è Cluster configuration:")
print(f"  ‚Ä¢ Number of workers: {n_workers}")
print(f"  ‚Ä¢ Memory limit: {memory_limit}/worker")

# Create cluster
cluster = LocalCluster(
    n_workers=n_workers,
    threads_per_worker=2,
    memory_limit=memory_limit,
    silence_logs=False,
    dashboard_address=':8787'
)

# Connect to cluster
client = Client(cluster)

print(f"\n‚úÖ Cluster created!")
print(f"üìä Dashboard URL: {client.dashboard_link}")
print(f"üîó Connection info: {client}")

# Display cluster status
print(f"\nüìà Cluster status:")
print(f"  ‚Ä¢ Workers: {len(client.scheduler_info()['workers'])}")
print(f"  ‚Ä¢ Total cores: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")
print(f"  ‚Ä¢ Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")

ImportError: dask.distributed is not installed.

Please either conda or pip install distributed:

  conda install dask distributed             # either conda install
  python -m pip install "dask[distributed]" --upgrade    # or pip install

In [None]:
# ‚ö° Parallel Computing Performance Test
print("‚ö° Parallel vs Serial Computing Comparison:")
print("=" * 50)

import time

# Select subset of data for testing (avoid memory issues)
subset_data = radar_tw_ds.MaxDBZ.isel(time=slice(0, 1000))  # First 1000 time points
print(f"Test data: {subset_data.shape} ({subset_data.nbytes/1e6:.1f} MB)")

# Define a series of computation tasks
def complex_analysis(data):
    """Complex meteorological analysis function"""
    # Time average
    time_mean = data.mean(dim='time')
    # Time standard deviation  
    time_std = data.std(dim='time')
    # Maximum value
    time_max = data.max(dim='time')
    # 90th percentile
    time_90p = data.quantile(0.9, dim='time')
    
    return time_mean, time_std, time_max, time_90p

print(f"\nüîÑ Running complex analysis...")

# Serial computation (using .compute() for immediate execution)
print("Serial computation...")
start_time = time.time()
serial_results = complex_analysis(subset_data.compute())
serial_time = time.time() - start_time

# Parallel computation (using Dask)
print("Parallel computation...")
start_time = time.time()
parallel_results = dask.compute(*complex_analysis(subset_data))
parallel_time = time.time() - start_time

print(f"\nüìä Performance comparison:")
print(f"  ‚Ä¢ Serial computation: {serial_time:.2f} seconds")
print(f"  ‚Ä¢ Parallel computation: {parallel_time:.2f} seconds")
print(f"  ‚Ä¢ Speedup: {serial_time/parallel_time:.1f}x")

# Verify result consistency
mean_diff = abs(float(serial_results[0].mean()) - float(parallel_results[0].mean()))
print(f"  ‚Ä¢ Result difference: {mean_diff:.6f} (should be close to 0)")

print(f"\nüí° Key observations:")
print(f"  ‚Ä¢ Check the Dashboard for task execution graph")
print(f"  ‚Ä¢ Notice memory usage patterns")
print(f"  ‚Ä¢ Observe load distribution across workers")

In [None]:
# üéØ Compute vs Persist Strategy
print("üéØ Compute vs Persist Comparison:")
print("=" * 50)

# Create a scenario where intermediate results are reused
test_data = radar_tw_ds.MaxDBZ.isel(time=slice(0, 100))

print("Scenario: Multiple different analyses on the same intermediate result")

# Method 1: Recompute every time (compute)
print("\nüìä Method 1: Repeated computation")
start_time = time.time()

# Calculate intermediate result: daily average (assume 24 time points per day)
daily_mean_lazy = test_data.coarsen(time=24).mean()

# Use this intermediate result multiple times
result1 = daily_mean_lazy.max().compute()  # Compute once
result2 = daily_mean_lazy.min().compute()  # Compute again
result3 = daily_mean_lazy.std().compute()  # Compute yet again

method1_time = time.time() - start_time

# Method 2: Persist intermediate result (persist)
print("üìå Method 2: Persist strategy")
start_time = time.time()

# Persist intermediate result to memory/cache
daily_mean_persisted = test_data.coarsen(time=24).mean().persist()

# Use persisted result multiple times
result1_p = daily_mean_persisted.max().compute()
result2_p = daily_mean_persisted.min().compute()  
result3_p = daily_mean_persisted.std().compute()

method2_time = time.time() - start_time

print(f"\n‚è±Ô∏è Execution time comparison:")
print(f"  ‚Ä¢ Repeated computation: {method1_time:.2f} seconds")
print(f"  ‚Ä¢ Persist strategy: {method2_time:.2f} seconds")
print(f"  ‚Ä¢ Efficiency gain: {method1_time/method2_time:.1f}x")

print(f"\nüß† Strategy recommendations:")
print(f"  ‚Ä¢ One-time computation ‚Üí use .compute()")
print(f"  ‚Ä¢ Intermediate result reuse ‚Üí use .persist()")
print(f"  ‚Ä¢ Prefer persist when memory is sufficient")
print(f"  ‚Ä¢ Use persist cautiously when memory is limited")

# Check memory usage of persisted data
memory_usage = daily_mean_persisted.nbytes / 1e6
print(f"  ‚Ä¢ Persisted data size: {memory_usage:.1f} MB")

## 5. Advanced Applications: Large Dataset Integration Analysis

Let's combine multiple datasets to showcase the power of Dask + Intake in real research scenarios.

### üîó Multi-Dataset Integration
- Radar observations vs reanalysis data comparison
- Spatiotemporal alignment and interpolation
- Large-scale meteorological model validation

### üìä Interactive Visualization
- Real-time data exploration
- Dynamic time series analysis
- Geospatial visualization

### ‚ö° Performance Optimization Tips
- Memory management strategies
- Chunking optimization
- Computation graph simplification

In [None]:
# üîó Load Multiple Datasets for Comparison
print("üîó Multi-Dataset Integration Analysis:")
print("=" * 50)

try:
    # Load ERA5 dataset
    era5_ds = catalog.era5_reanalysis.to_dask()
    print("‚úÖ ERA5 data loaded successfully")
    
    # Load station dataset  
    station_ds = catalog.station_data.to_dask()
    print("‚úÖ Station data loaded successfully")
    
    print(f"\nüìä Dataset overview:")
    print(f"  ‚Ä¢ Radar data: {radar_tw_ds.MaxDBZ.shape} (Taiwan, 2013-2023)")
    print(f"  ‚Ä¢ ERA5 data: {era5_ds.dims}")
    print(f"  ‚Ä¢ Station data: {station_ds.dims}")
    
    # Check temporal overlap
    radar_time_range = (radar_tw_ds.time.min().compute(), radar_tw_ds.time.max().compute())
    print(f"\n‚è∞ Time ranges:")
    print(f"  ‚Ä¢ Radar: {radar_time_range[0].values} to {radar_time_range[1].values}")
    
    if 'time' in era5_ds.coords:
        era5_time_range = (era5_ds.time.min().compute(), era5_ds.time.max().compute())
        print(f"  ‚Ä¢ ERA5: {era5_time_range[0].values} to {era5_time_range[1].values}")
    
    print(f"\nüéØ Analysis possibilities:")
    print(f"  ‚Ä¢ Multi-scale precipitation event observations")
    print(f"  ‚Ä¢ Model data vs observation validation")
    print(f"  ‚Ä¢ Extreme weather event case studies")
    
except Exception as e:
    print(f"‚ö†Ô∏è Other datasets loading failed: {e}")
    print(f"Continuing with single dataset analysis...")
    
    # Advanced analysis of single dataset
    print(f"\nüéØ Advanced radar data analysis:")
    
    # Seasonal analysis
    radar_grouped = radar_tw_ds.MaxDBZ.groupby('time.season')
    seasonal_mean = radar_grouped.mean()
    
    print(f"  ‚Ä¢ Seasonal averages created (not yet computed)")
    print(f"  ‚Ä¢ Available seasons: {list(seasonal_mean.season.values)}")
    
    # Spatial statistics
    spatial_mean = radar_tw_ds.MaxDBZ.mean(dim=['x', 'y'])
    temporal_trend = spatial_mean.coarsen(time=24*7).mean()  # Weekly average
    
    print(f"  ‚Ä¢ Time series analysis prepared")
    print(f"  ‚Ä¢ Spatial averaging computation set up")

In [None]:
# üìä Large Data Visualization and Analysis
print("üìä Dask Large Data Visualization:")
print("=" * 50)

import matplotlib.pyplot as plt
import numpy as np

# Select representative data for visualization
print("Preparing visualization data...")

# Calculate monthly averages (reduce data volume but maintain trends)
monthly_mean = radar_tw_ds.MaxDBZ.resample(time='1M').mean()
print(f"Monthly average data shape: {monthly_mean.shape}")

# Calculate spatial average time series
spatial_avg_monthly = monthly_mean.mean(dim=['x', 'y'])

print("Executing computation...")
spatial_avg_result = spatial_avg_monthly.compute()

# Create visualization
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# 1. Time series plot
ax1.plot(spatial_avg_result.time, spatial_avg_result, 'b-', linewidth=1)
ax1.set_title('üìà Taiwan Radar Reflectivity Monthly Average Time Series (2013-2023)', fontsize=14, pad=20)
ax1.set_ylabel('Average Reflectivity (dBZ)', fontsize=12)
ax1.grid(True, alpha=0.3)
ax1.tick_params(labelrotation=45)

# 2. Seasonal box plot
seasonal_data = []
seasonal_labels = []
for season in ['DJF', 'MAM', 'JJA', 'SON']:
    season_mask = spatial_avg_result.time.dt.season == season
    if season_mask.any():
        seasonal_data.append(spatial_avg_result[season_mask].values)
        seasonal_labels.append(season)

if seasonal_data:
    ax2.boxplot(seasonal_data, labels=seasonal_labels)
    ax2.set_title('üìä Seasonal Variation Distribution', fontsize=14, pad=20)
    ax2.set_ylabel('Reflectivity (dBZ)', fontsize=12)
    ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Statistical summary
print(f"\nüìà Time series statistics:")
print(f"  ‚Ä¢ Data points: {len(spatial_avg_result):,}")
print(f"  ‚Ä¢ Time span: {spatial_avg_result.time.min().values} to {spatial_avg_result.time.max().values}")
print(f"  ‚Ä¢ Mean: {float(spatial_avg_result.mean()):.2f} dBZ")
print(f"  ‚Ä¢ Standard deviation: {float(spatial_avg_result.std()):.2f} dBZ")
print(f"  ‚Ä¢ Maximum: {float(spatial_avg_result.max()):.2f} dBZ")
print(f"  ‚Ä¢ Minimum: {float(spatial_avg_result.min()):.2f} dBZ")

In [None]:
# üéØ Performance Optimization and Resource Cleanup
print("üéØ Dask Performance Optimization Tips:")
print("=" * 50)

# Check current cluster status
print("üìä Cluster resource usage:")
workers = client.scheduler_info()['workers']
for worker_id, worker_info in workers.items():
    memory_used = worker_info.get('memory', 0)
    memory_limit = worker_info.get('memory_limit', 0)
    memory_percent = (memory_used / memory_limit * 100) if memory_limit > 0 else 0
    
    print(f"  ‚Ä¢ Worker {worker_id[-8:]}: "
          f"Memory {memory_used/1e6:.0f}/{memory_limit/1e6:.0f} MB "
          f"({memory_percent:.1f}%)")

print(f"\nüí° Performance optimization recommendations:")
print(f"  1. üìè Appropriate chunk sizes:")
print(f"     ‚Ä¢ Time-intensive computations: Use larger time chunks")
print(f"     ‚Ä¢ Spatial analysis: Consider spatial chunking")
print(f"     ‚Ä¢ Memory constraints: Keep chunk size < 100MB")

print(f"  2. üîÑ Computation order optimization:")
print(f"     ‚Ä¢ Filter first, then compute (temporal/spatial subsets)")
print(f"     ‚Ä¢ Use persist() to cache intermediate results")
print(f"     ‚Ä¢ Avoid unnecessary .compute() calls")

print(f"  3. üß† Memory management:")
print(f"     ‚Ä¢ Regularly clean up unneeded variables")
print(f"     ‚Ä¢ Monitor memory usage")
print(f"     ‚Ä¢ Restart cluster when needed")

print(f"  4. üìä Monitoring tools:")
print(f"     ‚Ä¢ Dask Dashboard: {client.dashboard_link}")
print(f"     ‚Ä¢ Task graph visualization")
print(f"     ‚Ä¢ Real-time performance metrics")

# Cleanup demonstration
print(f"\nüßπ Resource cleanup demonstration:")

# Delete large intermediate variables
vars_to_clean = ['daily_mean_persisted', 'monthly_mean', 'spatial_avg_monthly']
for var_name in vars_to_clean:
    if var_name in locals():
        del locals()[var_name]
        print(f"  ‚úÖ Cleaned variable: {var_name}")

# Force garbage collection
import gc
gc.collect()
print(f"  ‚ôªÔ∏è Executed garbage collection")

print(f"\nüéä Congratulations on completing the Advanced Dask Workshop!")
print(f"üîó Further learning resources:")
print(f"  ‚Ä¢ Dask official documentation: https://docs.dask.org/")
print(f"  ‚Ä¢ Xarray with Dask: https://docs.xarray.dev/en/stable/user-guide/dask.html")
print(f"  ‚Ä¢ Intake advanced usage: https://intake.readthedocs.io/")

# Keep cluster running for further experimentation
print(f"\nüí° Cluster is still running for further experiments!")
print(f"   Use client.close() to close the connection")
print(f"   Use cluster.close() to shut down the cluster")

## üéâ Workshop Summary

### What did we learn?

1. **üìÅ Intake Data Catalog System**
   - Unified data access interface
   - YAML configuration file management
   - Error handling and troubleshooting

2. **üöÄ Dask Distributed Computing**
   - Lazy evaluation (delayed computation)
   - Smart chunking strategies
   - Parallel computing optimization

3. **üîó Integrated Workflow**
   - Seamless Intake + Dask integration
   - Efficient large dataset processing
   - Real-time monitoring and debugging

4. **üí° Practical Skills**
   - Memory management strategies
   - Performance optimization methods
   - Resource cleanup and maintenance

### üéØ Next Steps for Exploration

- **Cloud Computing**: Dask on Kubernetes
- **Machine Learning**: Dask-ML for large-scale machine learning
- **Real-time Processing**: Dask Streaming
- **Advanced Visualization**: Datashader + Holoviews

### ü§ù Community Resources

Join the Python scientific computing community to exchange experiences with other researchers!