# Coffea-Casa Processor-Based Workflow Test

This notebook demonstrates the UnifiedProcessor workflow with coffea.processor.Runner on Coffea-Casa, including skimming, analysis, histogramming, and statistics steps.

## Workflow Overview

1. Setup Python path for intccms package
2. Install dependencies and register modules for cloud pickle
3. Acquire Dask client from Coffea-Casa environment
4. Configure analysis parameters
5. Run metadata extraction
6. Initialize UnifiedProcessor
7. Run processor with coffea.processor.Runner
8. Save histograms
9. Run statistical analysis (if enabled)

In [1]:
# Setup Python path to include intccms package
import sys
import time
from pathlib import Path

# Add src directory to Python path
repo_root = Path.cwd()
src_dir = repo_root / "src"
examples_dir = repo_root
if str(src_dir) not in sys.path:
    sys.path.insert(0, str(src_dir))
if str(examples_dir) not in sys.path:
    sys.path.insert(0, str(examples_dir))
print(f"‚úÖ Added {src_dir} to Python path")
print(f"‚úÖ Added {examples_dir} to Python path")

‚úÖ Added /home/cms-jovyan/intccms-agc-demo-10/src to Python path
‚úÖ Added /home/cms-jovyan/intccms-agc-demo-10 to Python path


In [2]:
COFFEA_VERSION = "2025.10.3.dev17+g2cde65fb6" # 2025.10.2
COFFEA_PIP = "git+https://github.com/scikit-hep/coffea@master"
try:
    import omegaconf
except ImportError:
    print("‚ö†Ô∏è omegaconf not found, installing...")
    ! pip install omegaconf

try:
    import coffea
    print("Coffea version: ", coffea.__version__)
    assert coffea.__version__ == "2025.10.3.dev9+g41c84f7a9"
except (ImportError, AssertionError):
    print("‚ö†Ô∏è coffea not found or incorrect version, installing...")
    ! pip install $COFFEA_PIP
print("‚úÖ All dependencies are installed.")

Coffea version:  2025.10.3.dev9+g41c84f7a9
‚úÖ All dependencies are installed.


In [3]:
# Imports and cloudpickle registration
import copy
import os

os.environ['AWS_ACCESS_KEY_ID'] = ""
os.environ['AWS_SECRET_ACCESS_KEY'] = ""

from dask.distributed import Client, PipInstall
from coffea.processor import DaskExecutor
from coffea.nanoevents import NanoAODSchema

import cloudpickle
import intccms
import example_cms

# Register modules for cloud pickle
cloudpickle.register_pickle_by_value(intccms)
cloudpickle.register_pickle_by_value(example_cms)

from example_cms.configs.configuration import config as original_config
from intccms.schema import Config, load_config_with_restricted_cli
from intccms.utils.output import OutputDirectoryManager
from intccms.metadata_extractor import DatasetMetadataManager
from intccms.datasets import DatasetManager
from intccms.analysis import run_processor_workflow

## Acquire Dask Client

Coffea-Casa provides a shared scheduler. Connect to it and register dependencies.

In [4]:
def acquire_client():
    """Acquire Dask client from Coffea-Casa environment."""
    client = Client("tls://localhost:8786")
    dependencies = [COFFEA_PIP] #["coffea==2025.10.2"]
    client.register_plugin(PipInstall(packages=dependencies))
    cluster = None  # no local cluster in this mode
    return client, cluster

## Configuration Setup

Configure analysis parameters including which processes to run and output settings.

In [5]:
# Configuration setup
config = copy.deepcopy(original_config)

# Limit files for testing
config["datasets"]["max_files"] = None

# Use local output directory
config["general"]["output_dir"] = "example_cms/outputs/"

# Configuration flags
config["general"]["read_from_cache"] = False
config["general"]["run_metadata_generation"] = False
config["general"]["run_processor"] = True  # Set to False to skip processor and load saved histograms
config["general"]["save_skimmed_output"] = False  # Set to True to save filtered events to disk
config["general"]["run_analysis"] = True
config["general"]["run_histogramming"] = True
config["general"]["run_systematics"] = True
config["general"]["run_statistics"] = False

# Test only signal dataset
#config["general"]["processes"] = ["data"]

cli_args = []
full_config = load_config_with_restricted_cli(config, cli_args)
validated_config = Config(**full_config)

print(f"‚úÖ Configuration loaded with max_files={validated_config.datasets.max_files}")
print(f"   - run_processor: {validated_config.general.run_processor}")
print(f"   - save_skimmed_output: {validated_config.general.save_skimmed_output}")
print(f"   - run_analysis: {validated_config.general.run_analysis}")
print(f"   - run_histogramming: {validated_config.general.run_histogramming}")
print(f"   - run_systematics: {validated_config.general.run_systematics}")
print(f"   - run_statistics: {validated_config.general.run_statistics}")

‚úÖ Configuration loaded with max_files=None
   - run_processor: True
   - save_skimmed_output: False
   - run_analysis: True
   - run_histogramming: True
   - run_systematics: True
   - run_statistics: False


## Run Complete Workflow

Execute the full processor workflow with proper cleanup in a try/finally block.

In [None]:
try:
    client, cluster = acquire_client()
    print(f"‚úÖ Connected to Dask scheduler")
    print(f"üìä Dashboard: {client.dashboard_link}")
    # Output Manager Setup
    output_manager = OutputDirectoryManager(
        root_output_dir=validated_config.general.output_dir,
        cache_dir=validated_config.general.cache_dir,
        metadata_dir=validated_config.general.metadata_dir,
        skimmed_dir=validated_config.general.skimmed_dir
    )
    print(f"‚úÖ Output directory: {output_manager.root_output_dir}")

    # Step 1: Metadata Extraction
    print("\nüìã Extracting metadata...")
    dataset_manager = DatasetManager(validated_config.datasets)
    metadata_generator = DatasetMetadataManager(
        dataset_manager=dataset_manager,
        output_manager=output_manager,
        executor=DaskExecutor(client=client),
        config=validated_config,
    )
    metadata_generator.run(
        generate_metadata=validated_config.general.run_metadata_generation,
        processes_filter=validated_config.general.processes if hasattr(validated_config.general, 'processes') else None
    )

    metadata_lookup = metadata_generator.build_metadata_lookup()
    workitems = metadata_generator.workitems


    print(f"‚úÖ Generated {len(workitems)} workitems")

    # Show first few workitems
    print("\nüîç Workitem Details (first 5):")
    for i, wi in enumerate(workitems[:5]):
        print(f"  {i}: dataset='{wi.dataset}' process='{wi.usermeta.get('process', 'N/A')}'")
    if len(workitems) > 5:
        print(f"  ... and {len(workitems) - 5} more")

    # Step 2: Run Processor Workflow (or load saved histograms)
    print("\nüöÄ Running processor workflow...")
    t0 = time.perf_counter()
    output, report, metrics = run_processor_workflow(
        config=validated_config,
        output_manager=output_manager,
        metadata_lookup=metadata_lookup,
        workitems=workitems,
        executor=DaskExecutor(client=client, treereduction=8, retries=0),
        schema=NanoAODSchema,
    )
    t1 = time.perf_counter()
    print("‚úÖ Processor workflow complete!")

    # Step 3: Display Results
    print("\n" + "=" * 60)
    print("üìä Results:")
    print("=" * 60)

    if validated_config.general.run_processor:
        print(f"üìä Total events processed: {output.get('processed_events', 0):,}")
        if 'skimmed_events' in output:
            print(f"‚úÇÔ∏è  Events after skim: {output.get('skimmed_events', 0):,}")

    # Summary
    print("\n" + "=" * 60)
    print("‚úÖ Complete processor workflow finished!")
    print("=" * 60)

finally:
    # Cleanup
    print("\nüßπ Cleaning up...")
    client.close()
    print("‚úÖ Done!")

‚úÖ Connected to Dask scheduler
üìä Dashboard: /user/mohamed.aly@cern.ch/proxy/8787/status


‚úÖ Output directory: /home/cms-jovyan/intccms-agc-demo-10/example_cms/outputs

üìã Extracting metadata...


‚úÖ Generated 39603 workitems

üîç Workitem Details (first 5):
  0: dataset='signal_0__nominal' process='signal'
  1: dataset='signal_0__nominal' process='signal'
  2: dataset='signal_1__nominal' process='signal'
  3: dataset='signal_1__nominal' process='signal'
  4: dataset='signal_1__nominal' process='signal'
  ... and 39598 more

üöÄ Running processor workflow...


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Output()

In [None]:
report

In [None]:
# event size
# number of files per dataset
# file sizes
# number of events per file
# size of branches (per branch, per % branches)
# processor time (split IO vs process)

In [None]:
print(f"data read: {report["bytesread"] / 1000**3:.2f} GB in {report["chunks"]} chunks")

print(f"core-average event rate using \'processtime\': {report["entries"] / 1000 / report["processtime"]:.2f} kHz")
print(f"core-average data rate using \'processtime\': {report["bytesread"] / 1000**3 * 8 / report["processtime"]:.2f} Gbps")

print(f"average event rate using walltime: {report["entries"] / 1000 / (t1 - t0):.2f} kHz")
print(f"average data rate using walltime: {report["bytesread"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps")

print(f"Number of branches read: {len(report["columns"])}")