# Distributed Data Processing with DaskProcessor

This notebook demonstrates how to use **DaskProcessor** for scalable parallel processing of ROOT data files using Dask's distributed computing framework.

## Overview

DaskProcessor provides a drop-in replacement for the standard Processor that uses Dask for:
- **Local parallelization**: Efficiently use all cores on a single machine
- **Distributed computing**: Connect to Dask clusters for large-scale processing
- **Progress monitoring**: Track processing progress across multiple files
- **Error resilience**: Built-in retry mechanisms for failed tasks

Key advantages:
- Same output format (Awkward Arrays) as standard Processor
- Easy switching between serial, parallel, and distributed modes
- Seamless integration with HPC clusters

## 1. Import Required Libraries

In [None]:
# Standard library imports
import time
import os
from typing import Dict, Optional

# External packages
import awkward as ak

# pyutils imports
from pyutils.pydask import DaskProcessor
from pyutils.pyprocess import Processor
from pyutils.pylogger import Logger

# Initialize logger for notebook demonstrations
logger = Logger(print_prefix="[pydask demo]", verbosity=2)

## 2. Initialize DaskProcessor

Create and configure a DaskProcessor instance. DaskProcessor shares the same constructor parameters as the standard Processor but uses Dask for distributed execution.

In [None]:
# Initialize DaskProcessor with remote settings
dask_proc = DaskProcessor(
    tree_path="EventNtuple/ntuple",
    use_remote=True,           # Access remote persistent datasets
    location="disk",           # Read from disk storage
    verbosity=1,
    worker_verbosity=0
)

logger.log("DaskProcessor initialized with remote settings:", "success")
logger.log(f"  tree_path: {dask_proc._base.tree_path}", "info")
logger.log(f"  use_remote: {dask_proc._base.use_remote}", "info")
logger.log(f"  location: {dask_proc._base.location}", "info")

## 3. Single File Processing

Process a single ROOT file using DaskProcessor. This demonstrates the basic API for specifying branches and output field names.

In [None]:
# Example: Single file processing
# (Replace with actual file path and branches for real data)

branches = {
    "trk.mom": "momentum",  # Map ROOT branch to output field
    "trk.pos": "position"   # Multiple branches can be selected
}

# This would process a single file:
# result = dask_proc.process_data(
#     file_name="/path/to/data.root",
#     branches=branches
# )

logger.log("Single File Processing Example:", "info")
logger.log("When you have a single file, use file_name parameter:", "info")
logger.log("""
result = dask_proc.process_data(
    file_name="/path/to/data.root",
    branches={"trk.mom": "momentum", "trk.pos": "position"},
    show_progress=True
)
The result is an Awkward Array with the selected branches.
""", "info")

## 4. Multi-File Processing with File Lists

Process multiple files from a file list. This is where DaskProcessor shows its power by distributing work across multiple workers.

In [None]:
# Load the MDS3a.txt file list from the repository
import os

notebook_dir = os.getcwd()
file_list_path = os.path.join(notebook_dir, "../../MDS3a.txt")

# Read the file list
if os.path.exists(file_list_path):
    with open(file_list_path, 'r') as f:
        sample_files = [line.strip() for line in f if line.strip()]
    
    logger.log(f"Loaded MDS3a.txt file list", "success")
    logger.log(f"Total files available: {len(sample_files)}", "info")
    
    # Display statistics
    logger.log("\nFile list statistics:", "info")
    logger.log(f"  First file: {sample_files[0].split('/')[-1]}", "info")
    logger.log(f"  Last file: {sample_files[-1].split('/')[-1]}", "info")
    logger.log(f"  Dataset: MDC2025-001", "info")
else:
    logger.log("MDS3a.txt not found - using demo file paths", "warning")
    sample_files = [
        "/pnfs/mu2e/persistent/datasets/phy-nts/nts/mu2e/ensembleMDS3aMix1BBTriggered/MDC2025-001/root/01/18/nts.mu2e.ensembleMDS3aMix1BBTriggered.MDC2025-001.001430_00000552.root",
    ]

## 5. Configure Parallelization Parameters

Explore key parameters for optimizing DaskProcessor performance based on your hardware and workload characteristics.

In [None]:
# Key DaskProcessor.process_data() parameters:

parameter_guide = {
    "n_workers": {
        "description": "Number of parallel workers in the Dask cluster",
        "default": "All available CPU cores",
        "example": "n_workers=4  # Use 4 workers",
        "use_case": "Scales to machine resources"
    },
    "threads_per_worker": {
        "description": "Number of threads per worker",
        "default": 1,
        "example": "threads_per_worker=2  # 2 threads per worker",
        "use_case": "For I/O-bound tasks, increase threads"
    },
    "processes": {
        "description": "Use processes instead of threads",
        "default": False,
        "example": "processes=True  # Use process-based parallelism",
        "use_case": "CPU-bound tasks, GIL avoidance"
    },
    "show_progress": {
        "description": "Display a progress bar during processing",
        "default": True,
        "example": "show_progress=True",
        "use_case": "Monitor long-running jobs"
    },
    "retries": {
        "description": "Number of retries for failed tasks",
        "default": 0,
        "example": "retries=2  # Retry up to 2 times",
        "use_case": "Resilience to transient failures"
    }
}

logger.log("Parallelization Parameter Guide:", "success")
for param, details in parameter_guide.items():
    logger.log(f"\n{param}:", "info")
    logger.log(f"  Description: {details['description']}", "info")
    logger.log(f"  Default: {details['default']}", "info")
    logger.log(f"  Example: {details['example']}", "info")
    logger.log(f"  Use Case: {details['use_case']}", "info")

## 6. Connect to Remote Dask Scheduler

Scale beyond a single machine by connecting to a Dask cluster for large-scale distributed computing across multiple nodes.

In [None]:
# Remote scheduler setup instruction
remote_scheduler_guide = """
STEP 1: Start Dask Scheduler on Cluster Head Node
===================================================
In a tmux session on your Dask cluster's head node:

    dask-scheduler --port 8786

The scheduler will output something like:
    "Scheduler started at tcp://cluster-head.example.com:8786"


STEP 2: Connect from Analysis Script
====================================
In your analysis notebook/script:

    branches = {"trk.mom": "momentum", "trk.pos": "position"}
    
    result = dask_proc.process_data(
        file_list_path="/path/to/file_list.txt",
        branches=branches,
        scheduler_address="tcp://cluster-head.example.com:8786"
    )

The scheduler handles distributing tasks across all available worker nodes.
Worker processes should already be running on cluster nodes.


ADVANTAGES OF REMOTE SCHEDULER:
- Distribute work across multiple machines
- Utilize all available compute resources on the cluster
- Persistent scheduler can accept multiple jobs
- Progress monitoring from your local machine
- Decoupled job submission from execution
"""

logger.log(remote_scheduler_guide, "info")

## 7. Custom Worker Functions

Define custom processing logic for specialized data transformations, filtering, and manipulations that go beyond simple branch selection.

In [None]:
# Example custom worker function
def custom_physics_worker(file_path: str) -> Optional[ak.Array]:
    """
    Custom worker function that performs specialized physics analysis:
    - Reads specific branches from ROOT file
    - Applies physics cuts (e.g., momentum threshold)
    - Returns filtered Awkward Array
    
    Parameters:
        file_path (str): Path to the ROOT file to process
    
    Returns:
        ak.Array or None: Filtered data as Awkward Array
    """
    try:
        from pyutils.pyread import Reader
        
        # Initialize reader for this file
        reader = Reader(file_path, tree_path="EventNtuple/ntuple")
        
        # Read track data
        data = reader.read(branches=["trk.mom", "trk.pos", "trk.charge"])
        
        if data is None or len(data) == 0:
            return None
        
        # Example physics cuts:
        # - Keep only high-momentum tracks (momentum > 100 MeV/c)
        # - Keep only positively charged tracks
        high_mom_mask = data.trk.mom > 100
        positive_charge_mask = data.trk.charge > 0
        
        filtered = data[high_mom_mask & positive_charge_mask]
        
        return filtered if len(filtered) > 0 else None
        
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


# Example: Using custom worker function
logger.log("Custom Worker Function Example:", "success")
logger.log("""
# Define your custom worker function

def my_custom_worker(file_path):
    from pyutils.pyread import Reader
    reader = Reader(file_path, tree_path="EventNtuple/ntuple")
    data = reader.read(branches=["trk.mom", "trk.pos"])
    
    # Apply custom cuts
    filtered = data[data.trk.mom > 100]
    
    return filtered

# Use it with DaskProcessor
result = dask_proc.process_data(
    file_list_path="/path/to/file_list.txt",
    custom_worker_func=my_custom_worker,
    n_workers=4,
    show_progress=True
)
""", "info")

logger.log("\nKey Requirements for Custom Worker Functions:", "info")
logger.log("  ✓ Must accept file_path as a string parameter", "info")
logger.log("  ✓ Must return an Awkward Array or None", "info")
logger.log("  ✓ Should handle exceptions gracefully", "info")
logger.log("  ✓ Cannot rely on global state (runs in separate processes)", "info")

## 8. Compare Standard vs Dask Processors

Benchmark and understand the performance differences between standard Processor and DaskProcessor for various workload scenarios.

In [None]:
# Comparison table
comparison = {
    "Feature": [
        "Parallelization Type",
        "Best For",
        "Scalability",
        "Cluster Support",
        "Progress Tracking",
        "Memory Efficiency",
        "Debugging",
        "Error Resilience"
    ],
    "Standard Processor": [
        "Thread pool (max_workers param)",
        "Small file lists, quick testing",
        "Single machine (limited by cores)",
        "No native support",
        "Basic logging only",
        "Loads all results in memory",
        "Easier (all in-process)",
        "Limited retry mechanism"
    ],
    "DaskProcessor": [
        "Dask distributed framework",
        "Large file lists, HPC clusters",
        "Multi-machine via remote scheduler",
        "Full support (tcp:// addresses)",
        "Progress bar with live updates",
        "Distributed memory management",
        "More complex (separate processes)",
        "Built-in retries parameter"
    ]
}

# Display comparison
logger.log("STANDARD PROCESSOR vs DASK PROCESSOR", "success")
logger.log("=" * 80, "info")

import pandas as pd
df_comparison = pd.DataFrame(comparison)
logger.log(str(df_comparison.to_string(index=False)), "info")

# When to use which
logger.log("\n" + "=" * 80, "info")
logger.log("DECISION GUIDE:", "success")
logger.log("""
Use STANDARD PROCESSOR when:
  • Processing a small number of files (< 10-20)
  • You want simpler debugging
  • Operating on a single machine without HPC resources
  • Throughput is not critical
  • You're doing interactive analysis (faster startup)

Use DASK PROCESSOR when:
  • Processing hundreds or thousands of files
  • You have access to a multi-core machine or HPC cluster
  • You need progress monitoring for long-running jobs
  • You want resilience to transient failures (retries)
  • Complex workflows requiring custom worker functions
  • You want easy scaling without code changes

KEY ADVANTAGE: Both have identical APIs and output formats!
You can switch between them without changing analysis code.
""", "info")

logger.log("\n✓ This concludes the DaskProcessor tutorial!", "success")