# roastcoffea: Comprehensive Performance Monitoring for Coffea

This notebook demonstrates all features of roastcoffea including:
- Basic metrics collection
- Chunk-level tracking with `@track_metrics`
- Byte tracking for I/O analysis
- Fine-grained profiling with `track_time()` and `track_memory()`
- Complete visualization suite (17 plot types)

In [None]:
from contextlib import contextmanager
from pathlib import Path

import awkward as ak
import matplotlib.pyplot as plt
from coffea import processor
from coffea.nanoevents import NanoAODSchema
from dask.distributed import Client, LocalCluster

from roastcoffea import (
    MetricsCollector,
    track_bytes,
    track_memory,
    track_metrics,
    track_time,
)
from roastcoffea.visualization.plots import (
    plot_compression_ratio_distribution,
    plot_cpu_utilization_timeline,
    plot_data_access_percentage,
    plot_efficiency_summary,
    plot_executing_tasks_timeline,
    plot_memory_utilization_timeline,
    plot_occupancy_timeline,
    plot_per_task_bytes_read,
    plot_per_task_cpu_io,
    plot_per_task_overhead,
    plot_resource_utilization,
    plot_runtime_distribution,
    plot_runtime_vs_events,
    plot_throughput_timeline,
    plot_total_active_tasks_timeline,
    plot_worker_activity_timeline,
    plot_worker_count_timeline,
)

# Configure matplotlib for inline display
%matplotlib inline
plt.style.use('seaborn-v0_8-darkgrid')

In [None]:
@contextmanager
def acquire_client(n_workers=4, threads_per_worker=1):
    """Context manager for Dask client."""
    cluster = LocalCluster(
        n_workers=n_workers,
        threads_per_worker=threads_per_worker,
        processes=True,
    )
    client = Client(cluster)
    print(f"Dashboard: {client.dashboard_link}")

    try:
        yield client
    finally:
        client.close()
        cluster.close()

## Dataset Setup

Using CERN Open Data for reproducible demonstrations.

In [None]:
fileset = {
    "ttbar": {
        "files": {
            "root://eospublic.cern.ch//eos/opendata/cms/mc/RunIISummer20UL16NanoAODv9/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_mcRun2_asymptotic_v17-v1/120000/08FCB2ED-176B-064B-85AB-37B898773B98.root": "Events"
        }
    }
}

## Full Featured Processor

This processor demonstrates all roastcoffea tracking features:
- `@track_metrics`: Automatic chunk-level metrics (timing, memory, bytes)
- `track_bytes()`: Fine-grained I/O tracking for specific data access
- `track_time()`: Time profiling for code sections
- `track_memory()`: Memory profiling for code sections

In [None]:
class ComprehensiveProcessor(processor.ProcessorABC):
    """Processor with all roastcoffea features enabled."""

    @track_metrics
    def process(self, events):
        # Track jet loading I/O
        with track_bytes(self, events, "jet_loading"):
            with track_time(self, "jet_selection"):
                jets = events.Jet
                selected_jets = jets[jets.pt > 30]

        # Track muon loading I/O
        with track_bytes(self, events, "muon_loading"):
            with track_time(self, "muon_selection"):
                muons = events.Muon
                selected_muons = muons[muons.pt > 20]

        # Track memory-intensive computations
        with track_memory(self, "pt_calculations"):
            jet_pt_sum = ak.sum(selected_jets.pt, axis=1)
            muon_pt_sum = ak.sum(selected_muons.pt, axis=1)

        return {
            "nevents": len(events),
            "njets": len(selected_jets),
            "nmuons": len(selected_muons),
        }

    def postprocess(self, accumulator):
        return accumulator

## Run Analysis with Metrics Collection

In [None]:
comprehensive_processor = ComprehensiveProcessor()

with acquire_client(n_workers=4) as client:
    with MetricsCollector(
        client=client,
        processor_instance=comprehensive_processor,
        track_workers=True,
        worker_tracking_interval=1.0,
    ) as collector:
        executor = processor.DaskExecutor(client=client)
        runner = processor.Runner(
            executor=executor,
            schema=NanoAODSchema,
            chunksize=100_000,
            savemetrics=True,
        )

        output, report = runner(
            fileset,
            processor_instance=comprehensive_processor,
            treename="Events",
        )

        # Extract chunk metrics from output
        collector.extract_metrics_from_output(output)
        collector.set_coffea_report(report)

    # Print summary tables
    collector.print_summary()

# Get metrics for plotting
metrics = collector.get_metrics()
tracking_data = collector.tracking_data

## Performance Visualizations

roastcoffea provides 17 different plot types organized into categories:

### 1. Throughput & Data I/O
### 2. Worker Resource Utilization
### 3. Chunk-level Analysis
### 4. Per-Task Analysis
### 5. Summary Metrics

### Throughput & Data I/O

Track data processing rates, I/O patterns, file compression, and data access efficiency.

In [None]:
# Data Throughput Timeline (requires byte tracking)
if "chunk_info" in metrics and metrics["chunk_info"]:
    print("üìä Data Throughput Over Time")
    fig, ax = plot_throughput_timeline(
        chunk_info=metrics["chunk_info"],
        tracking_data=tracking_data,
        title="Data Throughput Timeline",
    )
    plt.show()
else:
    print("‚ö†Ô∏è  Byte tracking not available. Throughput plot requires @track_metrics decorator.")

In [None]:
# Compression Ratio Distribution
if "compression_ratios" in metrics and metrics["compression_ratios"]:
    print("üìä Compression Ratio Distribution Across Files")
    print("   Shows compression efficiency (compressed/uncompressed)")
    try:
        fig, ax = plot_compression_ratio_distribution(metrics)
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")
else:
    print("‚ö†Ô∏è  Compression ratio data not available")

In [None]:
# Bytes Read Percentage Distribution
if "bytes_read_percent_per_file" in metrics and metrics["bytes_read_percent_per_file"]:
    print("üìä Bytes Read Percentage Distribution")
    print("   Shows what % of each file's bytes were actually read")
    try:
        fig, ax = plot_data_access_percentage(metrics)
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")
else:
    print("‚ö†Ô∏è  Bytes read percentage data not available")

### Worker Resource Utilization

Monitor worker activity, task distribution, and resource usage.

In [None]:
# Worker Count Over Time
if tracking_data and "worker_counts" in tracking_data:
    print("üìä Worker Count Over Time")
    fig, ax = plot_worker_count_timeline(tracking_data)
    plt.show()

In [None]:
# Worker Activity Timeline (active tasks per worker)
if tracking_data and "worker_active_tasks" in tracking_data:
    print("üìä Worker Activity Timeline")
    print("   Shows active (processing + queued) tasks per worker")
    fig, ax = plot_worker_activity_timeline(
        tracking_data,
        max_legend_entries=5  # Hide legend if >5 workers
    )
    plt.show()

In [None]:
# Total Active Tasks Across All Workers
if tracking_data and "worker_active_tasks" in tracking_data:
    print("üìä Total Active Tasks Across All Workers")
    fig, ax = plot_total_active_tasks_timeline(tracking_data)
    plt.show()

In [None]:
# Worker Occupancy (task saturation)
if tracking_data and "worker_occupancy" in tracking_data:
    print("üìä Worker Occupancy (Task Saturation)")
    print("   0.0 = idle, higher values = more saturated")
    fig, ax = plot_occupancy_timeline(
        tracking_data,
        max_legend_entries=5
    )
    plt.show()

In [None]:
# Executing Tasks Per Worker
if tracking_data and "worker_executing" in tracking_data:
    print("üìä Executing Tasks Per Worker")
    print("   Shows tasks actually running (subset of active tasks)")
    fig, ax = plot_executing_tasks_timeline(
        tracking_data,
        max_legend_entries=5
    )
    plt.show()

In [None]:
# Memory Utilization
if tracking_data and "worker_memory" in tracking_data:
    print("üìä Memory Utilization Per Worker")
    fig, ax = plot_memory_utilization_timeline(tracking_data)
    plt.show()

In [None]:
# CPU Utilization Per Worker
if tracking_data and "worker_cpu" in tracking_data:
    print("üìä CPU Utilization Per Worker")
    print("   Shows CPU usage percentage (0-100%) for each worker")
    fig, ax = plot_cpu_utilization_timeline(
        tracking_data,
        max_legend_entries=5
    )
    plt.show()

### Chunk-level Analysis

Performance metrics for individual chunks (runtime, events processed).

In [None]:
# Chunk Runtime Distribution
if "raw_chunk_metrics" in metrics and metrics["raw_chunk_metrics"]:
    print("üìä Chunk Runtime Distribution")
    print("   Histogram of processing time per chunk")
    try:
        fig, ax = plot_runtime_distribution(
            chunk_metrics=metrics["raw_chunk_metrics"]
        )
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")
else:
    print("‚ö†Ô∏è  Chunk metrics not available")

In [None]:
# Chunk Runtime vs Number of Events
if "raw_chunk_metrics" in metrics and metrics["raw_chunk_metrics"]:
    print("üìä Chunk Runtime vs Number of Events")
    print("   Scatter plot showing scaling behavior")
    try:
        fig, ax = plot_runtime_vs_events(
            chunk_metrics=metrics["raw_chunk_metrics"]
        )
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")
else:
    print("‚ö†Ô∏è  Chunk metrics not available")

### Per-Task Analysis

Fine-grained metrics from Dask Spans for individual task performance.

In [None]:
span_metrics = getattr(collector, 'span_metrics', None)

if span_metrics:
    # CPU vs I/O Breakdown
    print("üìä Per-Task CPU vs I/O Time")
    try:
        fig, ax = plot_per_task_cpu_io(span_metrics)
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")
else:
    print("‚ö†Ô∏è  Span metrics not available")

In [None]:
if span_metrics:
    # Bytes Read Per Task
    print("üìä Per-Task Bytes Read")
    try:
        fig, ax = plot_per_task_bytes_read(span_metrics)
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")

In [None]:
if span_metrics:
    # Compression & Serialization Overhead
    print("üìä Per-Task Compression & Serialization Overhead")
    try:
        fig, ax = plot_per_task_overhead(span_metrics)
        plt.show()
    except Exception as e:
        print(f"   Plot unavailable: {e}")

### Summary Metrics

High-level overview of overall performance.

In [None]:
# Efficiency Summary
print("üìä Efficiency Summary")
fig, ax = plot_efficiency_summary(metrics)
plt.show()

In [None]:
# Resource Utilization Summary
print("üìä Resource Utilization")
fig, ax = plot_resource_utilization(metrics)
plt.show()

## Accessing Raw Metrics

All metrics are accessible programmatically for custom analysis.

In [None]:
# Overall metrics
print("=" * 60)
print("KEY METRICS")
print("=" * 60)
print(f"Total events: {metrics.get('total_events', 0):,}")
print(f"Total bytes read: {metrics.get('total_bytes_read_coffea', 0) / 1e9:.2f} GB")
print(f"Overall throughput: {metrics.get('overall_rate_gbps', 0):.2f} Gbps")
print(f"Wall time: {metrics.get('wall_time', 0):.2f}s")
print(f"Core efficiency: {metrics.get('core_efficiency', 0):.1f}%")
print(f"Number of chunks: {metrics.get('num_chunks', 0)}")

In [None]:
# Per-chunk metrics (if available)
if "raw_chunk_metrics" in metrics:
    print("\n" + "=" * 60)
    print("CHUNK-LEVEL DETAILS (first 3 chunks)")
    print("=" * 60)
    for i, chunk in enumerate(metrics["raw_chunk_metrics"][:3]):
        print(f"\nChunk {i+1}:")
        print(f"  Events: {chunk.get('num_events', 0):,}")
        print(f"  Duration: {chunk.get('duration', 0):.2f}s")
        print(f"  Bytes read: {chunk.get('bytes_read', 0) / 1e6:.2f} MB")
        print(f"  Memory delta: {chunk.get('mem_delta_mb', 0):+.1f} MB")
        
        # Fine-grained timing
        if "timing" in chunk and chunk["timing"]:
            print("  Timing breakdown:")
            for section, duration in chunk["timing"].items():
                print(f"    {section}: {duration:.3f}s")
        
        # Fine-grained bytes
        if "bytes" in chunk and chunk["bytes"]:
            print("  I/O breakdown:")
            for section, bytes_count in chunk["bytes"].items():
                print(f"    {section}: {bytes_count / 1e6:.2f} MB")

## Saving & Loading Measurements

Persist metrics for later analysis or comparison.

In [None]:
# Save measurement
output_dir = Path("measurements")
measurement_path = collector.save_measurement(
    output_dir, measurement_name="comprehensive_demo"
)
print(f"‚úÖ Saved to: {measurement_path}")

In [None]:
# Load it back
from roastcoffea.export.measurements import load_measurement

loaded = load_measurement(measurement_path)
print(f"‚úÖ Loaded metrics from {measurement_path}")
print(f"   Events processed: {loaded['metrics']['total_events']:,}")
print(f"   Throughput: {loaded['metrics']['overall_rate_gbps']:.2f} Gbps")