In [1]:
import os
import sys
import time
import gc 
import psutil
import json
from pathlib import Path

import uproot
import awkward as ak
import numpy as np

import vector
vector.register_awkward()

import dask
from dask.distributed import Client

print("All imports added")

All imports added


In [2]:
client = Client("tls://localhost:8786")
client

0,1
Connection method: Direct,
Dashboard: /user/anujraghav.physics@gmail.com/proxy/8787/status,

0,1
Comm: tls://192.168.161.139:8786,Workers: 0
Dashboard: /user/anujraghav.physics@gmail.com/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [3]:
HOME_DIR = Path(os.environ.get("HOME", "/home/cms-jovyan"))
PROJECT_NAME = "H-to-WW-NanoAOD-analysis"

PROJECT_DIR = HOME_DIR / PROJECT_NAME
DATASETS_DIR = PROJECT_DIR / "Datasets"
DATA_DIR = DATASETS_DIR / "DATA"
MC_DIR = DATASETS_DIR / "MC_samples"
AUX_DIR = PROJECT_DIR / "Auxillary_files"

GOLDEN_JSON_PATH = AUX_DIR / "Cert_271036-284044_13TeV_Legacy2016_Collisions16_JSON.txt"

RUN_PERIODS_2016 = {
    "Run2016G": {"run_min": 278820, "run_max": 280385},
    "Run2016H": {"run_min": 280919, "run_max": 284044}
}

print(f"HOME_DIR:         {HOME_DIR}")
print(f"PROJECT_DIR:     {PROJECT_DIR}")
print(f"DATA_DIR:        {DATA_DIR}")
print(f"MC_DIR:          {MC_DIR}")
print(f"AUX_DIR:         {AUX_DIR}")
print(f"GOLDEN_JSON:      {GOLDEN_JSON_PATH}")
print(f"JSON exists:     {GOLDEN_JSON_PATH.exists()}")


HOME_DIR:         /home/cms-jovyan
PROJECT_DIR:     /home/cms-jovyan/H-to-WW-NanoAOD-analysis
DATA_DIR:        /home/cms-jovyan/H-to-WW-NanoAOD-analysis/Datasets/DATA
MC_DIR:          /home/cms-jovyan/H-to-WW-NanoAOD-analysis/Datasets/MC_samples
AUX_DIR:         /home/cms-jovyan/H-to-WW-NanoAOD-analysis/Auxillary_files
GOLDEN_JSON:      /home/cms-jovyan/H-to-WW-NanoAOD-analysis/Auxillary_files/Cert_271036-284044_13TeV_Legacy2016_Collisions16_JSON.txt
JSON exists:     True


In [37]:
SAMPLE_MAPPING = {
    'data' : "Data",
    'dytoll' : "DY_to_Tau_Tau",
}

def load_urls_from_files(filepath, max_files = None):
    urls = []

    if not os.path.exists(filepath):
        return urls

    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            if line and line.startswith('root://'):
                urls.append(line)
                if max_files and len(urls) >= max_files:
                    break
    return urls

def load_all_files(data_dir, mc_dir, max_per_sample = None):

    files_dict = {}

    for directory in [data_dir, mc_dir]:
        if not os.path.exists(directory):
            continue

        for filename in os.listdir(directory):
            if not filename.endswith(".txt"):
                continue

            filepath = os.path.join(directory, filename)
            filename_lower = filename.lower().replace('.txt', '')

            label = None

            for pattern, sample_label in SAMPLE_MAPPING.items():
                if pattern in filename_lower:
                    label = sample_label
                    break

            if not label:
                print(f" unknown file: {filename}- skipping")
                continue

            urls = load_urls_from_files(filepath, max_per_sample)

            if urls: 
                if label in files_dict:
                    files_dict[label].extend(urls)
                else:
                    files_dict[label] =urls

    return files_dict

# files = load_all_files(DATA_DIR, MC_DIR, max_per_sample= 1)
files = load_all_files(DATA_DIR, MC_DIR)

print("\n" + "="*70)
print("FILES TO PROCESS")
print("="*70)
total = 0
for label, urls in files.items():
    print(f"{label:20s}: {len(urls):4d} files")
    total += len(urls)
print("_"*70)
print(f"{'TOTAL':20s}: {total:4d} files")
print("="*70)

 unknown file: VG.txt- skipping
 unknown file: Higgs.txt- skipping
 unknown file: WW.txt- skipping
 unknown file: Fakes.txt- skipping
 unknown file: VZ.txt- skipping
 unknown file: ggWW.txt- skipping
 unknown file: Top.txt- skipping

FILES TO PROCESS
Data                :   48 files
DY_to_Tau_Tau       :   61 files
______________________________________________________________________
TOTAL               :  109 files


In [38]:
def load_golden_json(json_input, run_periods=None):
    """
    Load golden JSON from either a file path (str) or a dict.
    """
    
    if isinstance(json_input, str):
        with open(json_input, 'r') as f:
            golden_json = json.load(f)
    elif isinstance(json_input, dict):
        golden_json = json_input
    else:
        raise TypeError(f"Expected str or dict, got {type(json_input)}")
    
    valid_lumis = {}
    for run_str, lumi_ranges in golden_json.items():
        run = int(run_str)
        
        # Filter by run periods 
        if run_periods is not None: 
            in_period = any(
                period['run_min'] <= run <= period['run_max']
                for period in run_periods.values()
            )
            if not in_period:
                continue
        
        valid_lumis[run] = [tuple(lr) for lr in lumi_ranges]
    
    return valid_lumis


def apply_json_mask(arrays, json_input, run_periods=None):

    valid_lumis = load_golden_json(json_input, run_periods)
    
    runs = ak.to_numpy(arrays.run)
    lumis = ak.to_numpy(arrays.luminosityBlock)
    
    mask = np. zeros(len(runs), dtype=bool)
    
    for run, lumi_ranges in valid_lumis.items():
        run_mask = (runs == run)
        
        if not np.any(run_mask):
            continue
        
        # Check lumi sections 
        run_lumis = lumis[run_mask]
        run_lumi_mask = np.zeros(len(run_lumis), dtype=bool)
        
        for lumi_start, lumi_end in lumi_ranges: 
            run_lumi_mask |= (run_lumis >= lumi_start) & (run_lumis <= lumi_end)
        
        mask[run_mask] = run_lumi_mask
    
    return ak.Array(mask)

In [39]:
Batch_size = 1_250_000

def load_events(file_url, batch_size=1_250_000, timeout=600, max_retries=3, retry_wait=10, is_data=False):
    columns = [
        "Electron_pt", "Electron_eta", "Electron_phi", "Electron_mass", 
        "Electron_mvaFall17V2Iso_WP90", "Electron_charge",
        
        "Muon_pt", "Muon_eta", "Muon_phi", "Muon_mass", 
        "Muon_tightId", "Muon_charge", "Muon_pfRelIso04_all",
        "PuppiMET_pt", "PuppiMET_phi",
        
        "Jet_pt", "Jet_eta", "Jet_phi", "Jet_mass",
        "Jet_btagDeepFlavB", "nJet", "Jet_jetId", "Jet_puId",

        "HLT_Mu12_TrkIsoVVL_Ele23_CaloIdL_TrackIdL_IsoVL_DZ",
        "HLT_Mu23_TrkIsoVVL_Ele12_CaloIdL_TrackIdL_IsoVL_DZ"
    ]

    if is_data:
        columns.extend(["run", "luminosityBlock"])
    else:
        columns.append("genWeight")
        
    for attempt in range(max_retries):
        try:
            with uproot.open(file_url, timeout=timeout) as f:
                tree = f['Events']
                
                for arrays in tree.iterate(columns, step_size=batch_size, library="ak"):
                    yield arrays
                
                return
                
        except (TimeoutError, OSError, IOError, ConnectionError) as e:
            error_type = type(e).__name__
            file_name = file_url.split('/')[-1]
            
            if attempt < max_retries - 1:
                print(f"      {error_type} on {file_name}")
                print(f"       Retry {attempt+1}/{max_retries-1} in {retry_wait}s...")
                time.sleep(retry_wait)
            else:
                print(f"     FAILED after {max_retries} attempts: {file_name}")
                print(f"       Error: {str(e)[:100]}")
                raise
                
        except Exception as e:
            file_name = file_url.split('/')[-1]
            print(f"     Unexpected error on {file_name}: {str(e)[:100]}")
            raise

In [40]:
def lepton_array(arrays):
    electrons = ak.zip({
        "pt": arrays.Electron_pt,
        "eta": arrays.Electron_eta,
        "phi": arrays.Electron_phi,
        "mass": arrays.Electron_mass,
        "charge": arrays.Electron_charge,
        "id_pass": arrays.Electron_mvaFall17V2Iso_WP90 == 1, 
        "flavor": ak.ones_like(arrays.Electron_pt) * 11
    })
    
    muons = ak.zip({
        "pt": arrays.Muon_pt,
        "eta": arrays.Muon_eta,
        "phi": arrays.Muon_phi,
        "mass": arrays.Muon_mass,
        "charge": arrays.Muon_charge,
        "id_pass": (arrays.Muon_tightId == 1) & (arrays.Muon_pfRelIso04_all < 0.15), 
        "flavor": ak.ones_like(arrays.Muon_pt) * 13
    })

    return electrons, muons

In [60]:
def select_tag_probe_events(leptons, probe_pt_lower = 10, probe_pt_upper = 50, eta_cut = 1.479):
    """
    Ordered Tag & Probe:
    - Tag   = Leading Lepton (Must pass Tight ID)
    - Probe = Subleading Lepton (No ID check yet)
    """
    
    sorted_leptons = leptons[ak.argsort(leptons.pt, ascending=False)]

    mask_2lep = ak.num(sorted_leptons) == 2
    events_2lep = sorted_leptons[mask_2lep]

    if len(events_2lep) == 0:
        return None, None

    tag_candidate = events_2lep[:, 0]   # Leading
    probe_candidate = events_2lep[:, 1] # Subleading

    # Charge
    mask_charge = tag_candidate.charge * probe_candidate.charge < 0
    
    # Kinematics (Tag > 35, Probe > 10)
    mask_pt = (tag_candidate.pt > 35) & \
              (probe_candidate.pt >50)
              # (probe_candidate.pt < probe_pt_upper) & \
    
    mask_eta = (abs(tag_candidate.eta) < eta_cut) & \
               (abs(probe_candidate.eta) < 2.5) & \
               (abs(probe_candidate.eta) > 1.479)
    
    #  Leading must pass ID
    mask_tag_id = (tag_candidate.id_pass == True)

    # 5. Final Mask
    final_mask = mask_charge & mask_pt & mask_eta & mask_tag_id

    # Return valid pairs
    return (tag_candidate[final_mask],
            probe_candidate[final_mask], 
            probe_pt_lower,
            probe_pt_upper, 
           eta_cut)

In [61]:
def create_lepton_vector(lepton):
    """Create 4-vector from lepton properties """
    return vector.array({
        "pt": lepton.pt,
        "eta": lepton.eta,
        "phi": lepton.phi,
        "mass": lepton.mass
    })

def calculate_mll(lepton_1, lepton_2):
    vec_1 = create_lepton_vector(lepton_1)
    vec_2 = create_lepton_vector(lepton_2)

    dilepton = vec_1 + vec_2

    mll = dilepton.mass

    return mll



In [62]:
import time
import awkward as ak
import numpy as np
import vector

vector.register_awkward() 

def tag_prob_process(golden_json_data, run_periods):
    """
    Factory function for Tag & Probe Analysis.
    Returns: label, numerator, denominator, metadata, error
    """

    def tag_probe_processing(label, file_url, file_idx):
        
        count_total_probes = 0   # Denominator
        count_passing_probes = 0 # Numerator 
        
        meta_info = (0, 0, 0) 
        
        file_name = file_url.split('/')[-1] 
        is_data = (label == 'Data')
        
        target_flavor = 'electron' 
        
        max_file_retries = 3

        for file_attempt in range(max_file_retries):
            try:
                #  Load Events
                for arrays in load_events(file_url, batch_size=1_250_000, is_data=is_data):
                    
                    # Apply JSON Mask 
                    if is_data and golden_json_data is not None:
                        try:
                            json_mask = apply_json_mask(arrays, golden_json_data, run_periods=run_periods)
                            if np.sum(json_mask) == 0: continue
                            arrays = arrays[json_mask]
                        except Exception as e: 
                            print(f"Warning: JSON mask failed for {file_name}: {e}")
                            continue
                    
                    # Create Lepton Objects 
                    electrons, muons = lepton_array(arrays)
                    
                    # Select Flavor
                    leptons = electrons if target_flavor == 'electron' else muons

                    tags, probes, pt_low, pt_high, eta_cut = select_tag_probe_events(leptons)
                    
                    # Store cuts for return
                    meta_info = (pt_low, pt_high, eta_cut)

                    if tags is None or len(tags) == 0:
                        continue

                    m_ll = calculate_mll(tags, probes)
                    
                    z_mask = (m_ll > 60) & (m_ll < 120)
                    
                    # Apply Mask
                    valid_tags = tags[z_mask]
                    valid_probes = probes[z_mask]
                    
                    if len(valid_tags) == 0: 
                        continue

                    # Count Events
                    n_total = len(valid_probes)
                    n_pass = ak.sum(valid_probes.id_pass)
                    
                    count_total_probes += n_total
                    count_passing_probes += n_pass
                
                return label, count_passing_probes, count_total_probes, meta_info, None

            except (OSError, IOError, ValueError) as e:
                if file_attempt < max_file_retries - 1:
                    time.sleep(3)
                    continue
                else: 
                    return label, 0, 0, None, f"{file_name}: Retry limit - {str(e)[:100]}"
            
            except Exception as e:
                return label, 0, 0, None, f"{file_name}: Crash - {str(e)[:100]}"

        return label, 0, 0, None, "Unknown loop exit"

    return tag_probe_processing

In [63]:
# %%
# MAIN PROCESSING 

import time
import json
from collections import defaultdict
from dask.distributed import progress

print(f"\n{'='*70}\nTAG & PROBE PROCESSING START\n{'='*70}")

golden_json_data = None
if GOLDEN_JSON_PATH.exists():
    with open(GOLDEN_JSON_PATH, 'r') as f:
        golden_json_data = json.load(f)
else:
    print(f"WARNING: Golden JSON not found at {GOLDEN_JSON_PATH}")

processing_task = tag_prob_process(
    golden_json_data=golden_json_data,
    run_periods=RUN_PERIODS_2016
)

arg_labels = []
arg_urls = []
arg_indices = []

for label, urls in files.items():
    for file_idx, file_url in enumerate(urls):
        arg_labels.append(label)
        arg_urls.append(str(file_url))
        arg_indices.append(file_idx)

start_time = time.perf_counter()

futures = client.map(
    processing_task,   
    arg_labels,
    arg_urls,
    arg_indices,
    retries=1
)

progress(futures)
results = client.gather(futures)
elapsed = time.perf_counter() - start_time

final_stats = defaultdict(lambda: [0, 0, 0]) 
final_meta = {} 
errors = []

for label, n_pass, n_total, meta, error in results:
    if error:
        errors.append((label, error))
    else:
        stats = final_stats[label]
        stats[0] += n_pass   
        stats[1] += n_total  
        stats[2] += 1        
        
        if label not in final_meta and meta is not None:
            final_meta[label] = meta

print(f"\n{'='*110}")
print(f"{'SAMPLE':<20} | {'FILES':<6} | {'PROBES':>12} | {'PASS':>10} | {'EFF':>8} | {'pT RANGE':>15} | {'ETA CUT':>10}")
print("="*110)

tot_pass = tot_total = tot_files = 0

for label, (n_pass, n_total, n_files) in sorted(final_stats.items()):
    eff = (n_pass / n_total * 100) if n_total > 0 else 0.0
    
    if label in final_meta:
        pt_min, pt_max, eta_cut = final_meta[label]
        pt_str = f"{pt_min}-{pt_max} GeV"
        eta_str = f"|n|<{eta_cut}"
    else:
        pt_str = "N/A"
        eta_str = "N/A"

    print(f"{label:<20} | {n_files:<6} | {n_total:>12,} | {n_pass:>10,} | {eff:>7.2f}% | {pt_str:>15} | {eta_str:>10}")
    
    tot_pass += n_pass
    tot_total += n_total
    tot_files += n_files

print("_"*110)
tot_eff = (tot_pass / tot_total * 100) if tot_total > 0 else 0.0
print(f"{'TOTAL':<20} | {tot_files:<6} | {tot_total:>12,} | {tot_pass:>10,} | {tot_eff:>7.2f}% | {'-':>15} | {'-':>10}")
print(f"{'='*110}")

if errors:
    print(f"\n[!] Encountered {len(errors)} errors:")
    for label, err in errors[:5]: print(f"  - {label}: {err}")

print(f"\nDone in {elapsed:.1f}s ({elapsed/len(arg_urls):.2f}s/file)")


TAG & PROBE PROCESSING START

SAMPLE               | FILES  |       PROBES |       PASS |      EFF |        pT RANGE |    ETA CUT
DY_to_Tau_Tau        | 61     |        9,227 |      8,202 |   88.89% |       10-50 GeV |  |n|<1.479
Data                 | 48     |          264 |        188 |   71.21% |       10-50 GeV |  |n|<1.479
______________________________________________________________________________________________________________
TOTAL                | 109    |        9,491 |      8,390 |   88.40% |               - |          -

Done in 183.4s (1.68s/file)
