# Configuration

In [1]:
# ==================== ANALYSIS CONFIGURATION ====================
from pathlib import Path

WORKFLOW_DIR = Path.cwd().resolve()


# Data Configuration
SAMPLES_PATH = WORKFLOW_DIR / "data"
# SAMPLES_PATH = './data'  # Path to input ROOT files
TRIGGERS_FILE = 'triggers.json'  # Path to triggers configuration
SAMPLES_READY_FILE = 'samples_ready.json'  # Preprocessed samples metadata

# Dataset Selection
PROCESS_ALL_DATASETS = True  # Set True to process all datasets, False for specific dataset
SUB_DATASET = 'hgg'  # Which dataset to process (e.g., 'hgg', 'hbb', 'qcd', etc.)

# Analysis Parameters
ECF_UPPER_BOUND = 3  # Calculate ECFs from n=2 to n=ECF_UPPER_BOUND (max: 6)
STEP_SIZE = 50_000  # Number of events per chunk for preprocessing

# Output Configuration
OUTPUT_DIR = 'output'  # Base directory for output parquet files

# TaskVine/Dask Configuration
# MANAGER_NAME = 'dv5-ecf-manager'  # Name for the TaskVine manager
# PORT_RANGE = [9123, 9128]  # Port range for TaskVine manager
RUN_INFO_PATH = 'vine-run-info'  # Directory for TaskVine logs
STAGING_PATH = './ecf-staging'  # Temporary staging directory

# Processing Options
PREPROCESS_DATA = True  # Set True to run preprocessing step
SHOW_SAMPLES = False  # Set True to display available samples and exit

# Advanced TaskVine Tuning (optional)
MAX_WORKERS = 30000  # Maximum number of workers
MAX_RETRIEVALS = 10  # Max concurrent result retrievals
TEMP_REPLICA_COUNT = 1  # Replication count for temp files
PRUNE_DEPTH = 0  # Task graph pruning depth

print("Configuration loaded!")
print(f"Dataset: {SUB_DATASET if not PROCESS_ALL_DATASETS else 'ALL'}")
print(f"ECF range: n=2 to n={ECF_UPPER_BOUND}")

Configuration loaded!
Dataset: ALL
ECF range: n=2 to n=3


In [2]:
import json
import os
import sys
import time
import warnings

import dask
import dask_awkward as dak
import awkward as ak
import numpy as np

from coffea import dataset_tools
from coffea.nanoevents import PFNanoAODSchema
from ndcctools.taskvine.compat import DaskVine

# Import helper functions
from ecf_helpers import (
    preprocess_data,
    filter_existing_files,
    show_available_samples,
    analysis
)

# Suppress warnings
warnings.filterwarnings("ignore", "Found duplicate branch")
warnings.filterwarnings("ignore", "Missing cross-reference index for")
warnings.filterwarnings("ignore", "dcut")
warnings.filterwarnings("ignore", "Please ensure")
warnings.filterwarnings("ignore", "invalid value")

print("Libraries imported successfully!")

Libraries imported successfully!


# Initialize TaskVine Manager

In [3]:
#manager_name = f"{os.environ['USER']}-makeDF_2018_mc";
manager_name = os.environ.get("VINE_MANAGER_NAME")
print(manager_name)
ports_str = os.environ.get("VINE_MANAGER_PORTS", "9123, 9150")
ports = [int(p.strip()) for p in ports_str.split(",")]

if len(ports) == 1:
    ports = ports[0]
else:
    ports = [int(p) for p in ports]

print(f"Manager Ports: {ports}")

floability-7f0c88d5-2dea-4717-9dd8-e04cf67058f9
Manager Ports: [9123, 9150]


In [4]:
# Create TaskVine manager for distributed computing
m = DaskVine(
    ports,
    name=manager_name,
    run_info_path=RUN_INFO_PATH,
    staging_path=STAGING_PATH,
)

# Configure TaskVine settings
m.tune("max-workers", MAX_WORKERS)
m.tune("max-retrievals", MAX_RETRIEVALS)
m.tune("transient-error-interval", 1)
m.tune("worker-source-max-transfers", 10000)
m.tune("transfer-temps-recovery", 0)
m.tune("attempt-schedule-depth", 100)
m.tune("watch-library-logfiles", 1)
m.tune("temp-replica-count", TEMP_REPLICA_COUNT)

0

In [5]:
print(f"TaskVine manager '{m.name}' initialized")
print(f"Listening on ports: {m.port}")
print(f"Run info path: {RUN_INFO_PATH}")

TaskVine manager 'floability-7f0c88d5-2dea-4717-9dd8-e04cf67058f9' initialized
Listening on ports: 9124
Run info path: vine-run-info


# Data Preprocessing (Optional)
Run this cell if you need to preprocess the data files. This step:

- Scans the input directory structure
- Creates metadata for all ROOT files
- Saves results to samples_ready.json

Skip this if samples_ready.json already exists.

In [6]:
if PREPROCESS_DATA:
    print("Starting data preprocessing...")
    start_time = time.time()
    
    samples_ready = preprocess_data(
        SAMPLES_PATH,
        step_size=STEP_SIZE,
        manager=m
    )
    
    # Save preprocessed samples
    with open(SAMPLES_READY_FILE, 'w') as fout:
        json.dump(samples_ready, fout)
    
    elapsed = (time.time() - start_time) / 60
    print(f"Preprocessing complete! Time: {elapsed:.2f} minutes")
    print(f"Saved to: {SAMPLES_READY_FILE}")
else:
    print("Skipping preprocessing (PREPROCESS_DATA=False)")

Starting data preprocessing...
categories = ['hgg_1', 'hgg_2', 'hgg_3', 'hgg_4', 'hgg_5']
Preprocessing samples...
Computing preprocessing tasks...
Preprocessing complete!
Preprocessing complete! Time: 23.20 minutes
Saved to: samples_ready.json


# Load Preprocessed Samples

In [7]:
if not os.path.exists(SAMPLES_READY_FILE):
    print(f"Error: {SAMPLES_READY_FILE} not found!")
    print("Please run preprocessing first (set PREPROCESS_DATA=True)")
    raise FileNotFoundError(SAMPLES_READY_FILE)

with open(SAMPLES_READY_FILE, 'r') as fin:
    samples_ready = json.load(fin)

print(f"Loaded {len(samples_ready)} sample categories")

Loaded 5 sample categories


# Filter and Select Datasets

In [8]:
# Filter to only include files that exist
filtered_samples = filter_existing_files(samples_ready)


if not filtered_samples:
    print("Error: No valid files found in any dataset.")
    raise ValueError("No valid files")

In [9]:
# Select which datasets to process
if PROCESS_ALL_DATASETS:
    samples_to_process = filtered_samples
    print(f"Processing ALL datasets ({len(samples_to_process)} total)")
else:
    if SUB_DATASET not in filtered_samples:
        print(f"Error: Dataset '{SUB_DATASET}' not found!")
        print("Available datasets:")
        for name in filtered_samples.keys():
            print(f"  - {name}")
        raise ValueError(f"Dataset '{SUB_DATASET}' not found")
    
    samples_to_process = {SUB_DATASET: filtered_samples[SUB_DATASET]}
    print(f"Processing dataset: {SUB_DATASET}")

# Show file counts
print("\nSamples to process:")
for name, item in samples_to_process.items():
    print(f"  {name}: {len(item['files'])} files")

Processing ALL datasets (5 total)

Samples to process:
  hgg_1: 800 files
  hgg_2: 800 files
  hgg_3: 800 files
  hgg_4: 800 files
  hgg_5: 800 files


# Create Analysis Tasks

In [10]:
print("Creating analysis tasks...")

# Create a wrapper function with configured parameters
def analysis_wrapper(events):
    return analysis(
        events,
        ecf_upper_bound=ECF_UPPER_BOUND,
        triggers_file=TRIGGERS_FILE
    )

# Apply analysis to all selected datasets
tasks = dataset_tools.apply_to_fileset(
    analysis_wrapper,
    samples_to_process,
    uproot_options={"allow_read_errors_with_report": False},
    schemaclass=PFNanoAODSchema,
)

print(f"Analysis tasks created for {len(samples_to_process)} dataset(s)")

Creating analysis tasks...


Issue: coffea.nanoevents.methods.vector will be removed and replaced with scikit-hep vector. Nanoevents schemas internal to coffea will be migrated. Otherwise please consider using that package!.
  from coffea.nanoevents.methods import vector


Processing dataset: hgg_1
Signal: Higgs jets
Processing dataset: hgg_2
Signal: Higgs jets
Processing dataset: hgg_3
Signal: Higgs jets
Processing dataset: hgg_4
Signal: Higgs jets
Processing dataset: hgg_5
Signal: Higgs jets
Analysis tasks created for 5 dataset(s)


# Execute Analysis

This cell runs the distributed computation. Make sure workers are connected to the TaskVine manager.

In [11]:
print("="*60)
print("Starting computation...")
print("="*60)

start_time = time.time()

# Execute the analysis
computed = dask.compute(
    tasks,
    scheduler=m.get,
    resources_mode=None,
    prune_depth=PRUNE_DEPTH,
    worker_transfers=True,
    resources={"cores": 1},
)

execution_time = time.time() - start_time

print("="*60)
print(f"COMPUTATION COMPLETE!")
print(f"Total execution time: {execution_time:.2f} seconds ({execution_time/60:.2f} minutes)")
print(f"Output saved to: {OUTPUT_DIR}/")
print("="*60)

Starting computation...


Output()

COMPUTATION COMPLETE!
Total execution time: 635.51 seconds (10.59 minutes)
Output saved to: output/


# Verify Output

In [12]:
# Check output directory
if os.path.exists(OUTPUT_DIR):
    print(f"\nOutput directory: {OUTPUT_DIR}")
    print("\nDatasets processed:")
    for item in os.listdir(OUTPUT_DIR):
        item_path = os.path.join(OUTPUT_DIR, item)
        if os.path.isdir(item_path):
            files = [f for f in os.listdir(item_path) if f.endswith('.parquet')]
            print(f"  {item}: {len(files)} parquet files")
else:
    print(f"Output directory {OUTPUT_DIR} not found")


Output directory: output

Datasets processed:
  hgg_1: 800 parquet files
  hgg_2: 800 parquet files
  hgg_3: 800 parquet files
  hgg_4: 800 parquet files
  hgg_5: 800 parquet files


In [13]:
from datetime import datetime
from zoneinfo import ZoneInfo
import uuid

now = datetime.now(ZoneInfo("America/New_York")).strftime("%Y-%m-%d %I:%M:%S %p")
exec_id = uuid.uuid4().hex[:8]

print(f"__floability_execution_done__::{now}::{exec_id}")

__floability_execution_done__::2026-02-08 08:45:28 PM::1c54d566
