# Distributed Processing with Dask

This notebook demonstrates how to use Dask for distributed processing of large oceanographic datasets on the Ocean Data Platform.

**What you'll learn:**
- Set up a Dask client in ODP Workspaces
- Process larger-than-memory datasets using Dask DataFrames
- Parallelize computations across dataset partitions
- Scale aggregations and transformations for large data volumes

**Why Dask for Ocean Data?**

Research vessels can generate 25-30 TB of data per mission from sensors, ROVs, and sampling systems. Dask enables:
- Processing datasets larger than available RAM
- Parallel execution across multiple cores/workers
- Lazy evaluation - build computation graphs before executing
- Familiar pandas-like API

**Prerequisites:**
- Running in ODP Workspace (Dask sidecar pre-configured)
- Completed `01_catalog_discovery.ipynb` to understand dataset access

## 1. Setup and Dask Client Initialization

In [None]:
import dask
import dask.dataframe as dd
from dask.distributed import Client, progress
import pandas as pd
import pyarrow as pa

# Check Dask version
print(f"Dask version: {dask.__version__}")

In [None]:
# Initialize Dask client
# Use dashboard_address=':0' to auto-select available port (avoids port conflicts)
client = Client(dashboard_address=':0')

# Build dashboard URL for cloud environment (jupyter-server-proxy)
import os
jupyter_prefix = os.environ.get('JUPYTERHUB_SERVICE_PREFIX', '/')
dashboard_port = client.scheduler_info().get('services', {}).get('dashboard', 8787)

# Construct full clickable URL for cloud environments
if 'JUPYTERHUB_SERVICE_PREFIX' in os.environ:
    proxy_dashboard = f"https://workspace.hubocean.earth{jupyter_prefix}proxy/{dashboard_port}/status"
else:
    proxy_dashboard = f"http://127.0.0.1:{dashboard_port}/status"

print(f"Dashboard: {proxy_dashboard}")
print("\nClick the URL above to open the Dask dashboard.")
client

In [ ]:
# Configure data scale for this tutorial
# Choose based on your VM resources:
#   small  = 100K rows (~10 MB)  - quick demo, minimal resources
#   medium = 1M rows   (~100 MB) - recommended for most environments
#   large  = 10M rows  (~1 GB)   - requires 8GB+ RAM

SCALE = "medium"  # Options: "small", "medium", "large"
USE_SYNTHETIC = True  # True = generate synthetic data, False = use ODP dataset

scales = {
    "small": 100_000,
    "medium": 1_000_000,
    "large": 10_000_000
}
n_rows = scales[SCALE]

print(f"Scale: {SCALE} ({n_rows:,} rows)")
print(f"Data source: {'Synthetic' if USE_SYNTHETIC else 'ODP Dataset'}")

In [None]:
import numpy as np
from datetime import datetime, timedelta

if USE_SYNTHETIC:
    # Generate synthetic oceanographic time series data
    print(f"Generating {n_rows:,} synthetic observations...")
    
    np.random.seed(42)
    
    # Simulate 50 monitoring stations
    stations = [f"STATION-{i:03d}" for i in range(50)]
    
    # Generate data
    synthetic_data = {
        "timestamp": pd.date_range("2020-01-01", periods=n_rows, freq="T"),  # Minute-level
        "station_id": np.random.choice(stations, n_rows),
        "latitude": np.random.uniform(58, 72, n_rows),
        "longitude": np.random.uniform(0, 30, n_rows),
        "temperature_c": np.random.normal(8, 4, n_rows),
        "salinity_psu": np.random.normal(35, 1, n_rows),
        "depth_m": np.random.choice([5, 10, 25, 50, 100, 200], n_rows),
        "dissolved_oxygen": np.random.normal(7, 1, n_rows),
    }
    
    source_df = pd.DataFrame(synthetic_data)
    print(f"Generated {len(source_df):,} rows, {source_df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
    
else:
    # Load from ODP dataset
    from odp.client import Client as ODPClient
    
    odp = ODPClient()
    DATASET_ID = "1d801817-742b-4867-82cf-5597673524eb"  # PGS Biota
    
    dataset = odp.dataset(DATASET_ID)
    stats = dataset.table.stats()
    print(f"ODP Dataset: {stats.num_rows:,} rows" if stats else "Dataset stats unavailable")
    
    # Load into DataFrame
    dfs = []
    for batch in dataset.table.select().batches():
        dfs.append(batch.to_pandas())
    source_df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
    print(f"Loaded {len(source_df):,} rows from ODP")

source_df.head()

## 3. Load Data into Dask

This notebook supports two data sources:

**Synthetic data (default):** Generates oceanographic time series at configurable scale. Best for learning Dask patterns without API dependencies.

**ODP dataset:** Loads real data from Ocean Data Platform. Use this to apply Dask to actual oceanographic datasets.

The same Dask patterns work for both - change `USE_SYNTHETIC` above to switch.

In [None]:
# Convert to Dask DataFrame
# Partition count scales with data size for efficient parallelism
partitions = max(4, len(source_df) // 100_000)  # ~100K rows per partition

print(f"Creating Dask DataFrame with {partitions} partitions...")
ddf = dd.from_pandas(source_df, npartitions=partitions)

print(f"Dask DataFrame: {ddf.npartitions} partitions")
print(f"Columns: {list(ddf.columns)}")
print(f"Estimated size: {source_df.memory_usage(deep=True).sum() / 1e6:.1f} MB")

In [None]:
# Preview the data (lazy - only computes first partition)
ddf.head()

## 4. Lazy Computation with Dask

Dask uses lazy evaluation - operations build a task graph that executes only when you call `.compute()`.

In [None]:
# Define a computation (lazy - not executed yet)
# Group by station and count observations

if USE_SYNTHETIC:
    group_col = 'station_id'
else:
    group_col = 'scientificName' if 'scientificName' in ddf.columns else ddf.columns[0]

group_counts = ddf.groupby(group_col).size()

# This just shows the task graph structure
print(f"Task graph created for groupby('{group_col}').size()")
print(f"Type: {type(group_counts)}")
print("(Not yet computed - Dask uses lazy evaluation)")

In [None]:
# Execute the computation
result = group_counts.compute()

print(f"\nObservations by {group_col}:")
print(result.sort_values(ascending=False).head(10))

## 5. Parallel Aggregations

Dask excels at parallel aggregations across partitions.

In [None]:
# Multiple aggregations in parallel
# Adjust column names based on your dataset schema

# Check available numeric columns
numeric_cols = ddf.select_dtypes(include=['number']).columns.tolist()
print(f"Numeric columns: {numeric_cols}")

In [None]:
# Parallel aggregations - statistics by group
if USE_SYNTHETIC:
    # Synthetic data: stats by station
    depth_stats = ddf.groupby('station_id').agg({
        'temperature_c': ['mean', 'min', 'max'],
        'depth_m': ['mean', 'count']
    }).compute()
    
    print("Temperature and depth statistics by station:")
    print(depth_stats.head(10))
    
else:
    # ODP data: stats by available columns
    if 'lifeStage' in ddf.columns and 'minimumDepthInMeters' in ddf.columns:
        depth_stats = ddf.groupby('lifeStage').agg({
            'minimumDepthInMeters': ['mean', 'min', 'max', 'count']
        }).compute()
        print("Depth statistics by life stage:")
        print(depth_stats)
    else:
        print(f"Available columns: {list(ddf.columns)}")

## 6. Parallel Apply for Custom Functions

Use `map_partitions` to apply custom functions across partitions in parallel.

In [None]:
def process_partition(df):
    """
    Custom processing function applied to each partition.
    Example: Extract temporal features and compute derived metrics.
    """
    result = df.copy()
    
    if 'timestamp' in df.columns:
        result['timestamp'] = pd.to_datetime(result['timestamp'], errors='coerce')
        result['year'] = result['timestamp'].dt.year
        result['month'] = result['timestamp'].dt.month
        result['hour'] = result['timestamp'].dt.hour
    elif 'eventDate' in df.columns:
        result['eventDate'] = pd.to_datetime(result['eventDate'], errors='coerce')
        result['year'] = result['eventDate'].dt.year
    
    return result

# Apply function across all partitions in parallel
processed_ddf = ddf.map_partitions(process_partition)

# Aggregate by time period
if 'year' in processed_ddf.columns:
    yearly_counts = processed_ddf.groupby('year').size().compute()
    print("Observations by year:")
    print(yearly_counts.sort_index())
    
if 'month' in processed_ddf.columns:
    monthly_counts = processed_ddf.groupby('month').size().compute()
    print("\nObservations by month:")
    print(monthly_counts.sort_index())

## 7. Memory-Efficient Processing Pattern

For very large datasets, process in chunks and aggregate results progressively.

In [None]:
def process_large_dataset_streaming(dataset, chunk_processor, filter_expr=None):
    """
    Process large ODP dataset in streaming fashion with Dask.
    
    Args:
        dataset: ODP dataset
        chunk_processor: Function that takes a DataFrame and returns aggregated result
        filter_expr: Optional filter
    
    Returns:
        Combined results from all chunks
    """
    results = []
    
    select = dataset.table.select(filter_expr) if filter_expr else dataset.table.select()
    
    for i, batch in enumerate(select.batches()):
        df = batch.to_pandas()
        
        # Process chunk with Dask (useful for complex operations)
        ddf_chunk = dd.from_pandas(df, npartitions=2)
        chunk_result = chunk_processor(ddf_chunk)
        results.append(chunk_result)
        
        print(f"Processed batch {i+1}: {len(df)} rows")
    
    return results

# Example chunk processor
def count_by_species(ddf_chunk):
    if 'scientificName' in ddf_chunk.columns:
        return ddf_chunk.groupby('scientificName').size().compute()
    return pd.Series()

# Process dataset
print("Processing dataset in streaming mode...")
chunk_results = process_large_dataset_streaming(dataset, count_by_species)

# Combine results
if chunk_results:
    combined = pd.concat(chunk_results).groupby(level=0).sum()
    print(f"\nCombined species counts ({len(combined)} species):")
    print(combined.sort_values(ascending=False).head(10))

## 8. Geospatial Processing with Dask

Combine ODP's geospatial filtering with Dask's parallel processing.

In [None]:
# Define regions of interest
regions = {
    "north_sea": "POLYGON((-5 51, 9 51, 9 62, -5 62, -5 51))",
    "norwegian_sea": "POLYGON((-5 62, 15 62, 15 72, -5 72, -5 62))",
    "barents_sea": "POLYGON((15 68, 40 68, 40 80, 15 80, 15 68))"
}

def process_region(dataset, region_name, wkt_polygon, geometry_col='footprintWKT'):
    """
    Process data for a specific geographic region.
    """
    try:
        # Use ODP's geospatial filter
        filter_expr = f"{geometry_col} within $area"
        
        dfs = []
        for batch in dataset.table.select(filter_expr, vars={"area": wkt_polygon}).batches():
            dfs.append(batch.to_pandas())
        
        if dfs:
            df = pd.concat(dfs, ignore_index=True)
            return {
                "region": region_name,
                "observation_count": len(df),
                "unique_species": df['scientificName'].nunique() if 'scientificName' in df.columns else 0
            }
    except Exception as e:
        print(f"Error processing {region_name}: {e}")
    
    return {"region": region_name, "observation_count": 0, "unique_species": 0}

# Process regions (could be parallelized with Dask delayed)
from dask import delayed

# Create delayed tasks for each region
delayed_results = [
    delayed(process_region)(dataset, name, wkt)
    for name, wkt in regions.items()
]

# Execute in parallel
print("Processing regions in parallel...")
region_stats = dask.compute(*delayed_results)

# Display results
region_df = pd.DataFrame(region_stats)
print("\nRegion Statistics:")
print(region_df)

In [ ]:
# Display dashboard links (cloud-friendly via jupyter-server-proxy)
import os
jupyter_prefix = os.environ.get('JUPYTERHUB_SERVICE_PREFIX', '/')
dashboard_port = client.scheduler_info().get('services', {}).get('dashboard', 8787)

if 'JUPYTERHUB_SERVICE_PREFIX' in os.environ:
    base = f"https://workspace.hubocean.earth{jupyter_prefix}proxy/{dashboard_port}"
else:
    base = f"http://127.0.0.1:{dashboard_port}"

print(f"Dask Dashboard Views:")
print(f"  Status:    {base}/status")
print(f"  Tasks:     {base}/tasks")
print(f"  Workers:   {base}/workers")
print(f"  Memory:    {base}/memory")
print(f"  Progress:  {base}/progress")

In [None]:
# Display dashboard link (cloud-friendly via jupyter-server-proxy)
import os
jupyter_prefix = os.environ.get('JUPYTERHUB_SERVICE_PREFIX', '/')
dashboard_port = client.scheduler_info().get('services', {}).get('dashboard', 8787)

print(f"Dask Dashboard: {jupyter_prefix}proxy/{dashboard_port}/status")
print("\nDashboard views to explore:")
print(f"  Task stream: {jupyter_prefix}proxy/{dashboard_port}/tasks")
print(f"  Workers:     {jupyter_prefix}proxy/{dashboard_port}/workers")
print(f"  Memory:      {jupyter_prefix}proxy/{dashboard_port}/memory")
print(f"  Progress:    {jupyter_prefix}proxy/{dashboard_port}/progress")

In [None]:
# Check cluster status
print("Cluster Info:")
print(f"  Workers: {len(client.scheduler_info()['workers'])}")
print(f"  Total threads: {sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())}")
print(f"  Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")

## 10. Cleanup

In [None]:
# Close Dask client when done
client.close()
print("Dask client closed.")

## Summary

This notebook demonstrated:

1. **Dask Client Setup** - Connecting to the ODP Workspace Dask cluster
2. **ODP to Dask** - Converting streaming ODP data to Dask DataFrames
3. **Lazy Evaluation** - Building computation graphs before execution
4. **Parallel Aggregations** - Group-by operations across partitions
5. **Custom Processing** - Using `map_partitions` for parallel apply
6. **Streaming Pattern** - Memory-efficient processing for large datasets
7. **Geospatial + Parallel** - Combining ODP spatial filters with Dask parallelism
8. **Monitoring** - Using the Dask dashboard for performance visibility

## Next Steps

- **02_geospatial_analysis.ipynb**: H3 hexagonal aggregation and mapping
- **03_data_pipeline.ipynb**: File ingest workflows
- **04_multi_dataset_join.ipynb**: Cross-dataset analysis

## Resources

- [Dask Documentation](https://docs.dask.org/)
- [Dask DataFrame API](https://docs.dask.org/en/stable/dataframe.html)
- [ODP Python SDK](https://docs.hubocean.earth/python_sdk/intro/)
- [Dask Best Practices](https://docs.dask.org/en/stable/best-practices.html)