In [1]:
import argparse
import multiprocessing
from types import SimpleNamespace

#redirector='file:///shared-scratch/cms/store/user/AGC'
redirector='file:///scratch/cms/store/user/AGC/'
#redirector='file:///shared/cms/store/user/AGC/'
sched_port=20012

# Mimic command-line arguments using SimpleNamespace
args = SimpleNamespace(
    n_max_files_per_sample=None,  # None to use all files
    data_cache=None,  # or path to local data
    remote_data_prefix=redirector,
    output="agc-out.root",
    inference=False,
    scheduler="dask-remote",  # choices=["mt", "dask-local", "dask-ssh", "dask-remote"]
    scheduler_address="tcp://127.0.0.1:"+str(sched_port),
    ncores=96, # multiprocessing.cpu_count()
    npartitions=92*3,
    hosts=None,
    verbose=True,
    statistical_validation=False,
    no_fitting=False
)

In [2]:
import argparse
import multiprocessing
from pathlib import Path
from time import time
from typing import Tuple

import ml
import ROOT
from distributed import Client, LocalCluster, SSHCluster, get_worker
from plotting import save_ml_plots, save_plots
from statistical import fit_histograms
from utils import AGCInput, AGCResult, postprocess_results, retrieve_inputs, save_histos

# Using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data
# as a reference. Values are in pb.
XSEC_INFO = {
    "ttbar": 396.87 + 332.97,  # nonallhad + allhad, keep same x-sec for all
    "single_top_s_chan": 2.0268 + 1.2676,
    "single_top_t_chan": (36.993 + 22.175) / 0.252,  # scale from lepton filter to inclusive
    "single_top_tW": 37.936 + 37.906,
    "wjets": 61457 * 0.252,  # e/mu+nu final states
}

def transfer_to_tier(dask_worker):
    import os
    os.popen('xrdcp --force "./agc-out.root" root://eosuser.cern.ch//eos/user/l/lpaciose/agc-out.root;')

    return True

def create_dask_client(scheduler: str, ncores: int, hosts: str, scheduler_address: str) -> Client:
    """Create a Dask distributed client."""
    if scheduler == "dask-local":
        lc = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True)
        return Client(lc)

    if scheduler == "dask-ssh":
        workers = hosts.split(",")
        print(f"Using worker nodes: {workers=}")
        # The creation of the SSHCluster object might need to be further configured to fit specific use cases.
        # For example, in some clusters the "local_directory" key must be supplied in the worker_options dictionary.
        sshc = SSHCluster(
            workers,
            connect_options={"known_hosts": None},
            worker_options={
                "nprocs": ncores,
                "nthreads": 1,
                "memory_limit": "32GB",
            },
        )
        return Client(sshc)

    if scheduler == "dask-remote":
        return Client(scheduler_address)

    raise ValueError(
        f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh', 'dask-remote']."
    )


def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
    """Add the trijet_mass observable to the dataframe after applying the appropriate selections."""

    # First, select events with at least 2 b-tagged jets
    df = df.Filter("Sum(Jet_btagCSVV2_cut > 0.5) > 1")

    # Build four-momentum vectors for each jet
    df = df.Define(
        "Jet_p4",
        "ConstructP4(Jet_pt_cut, Jet_eta_cut, Jet_phi_cut, Jet_mass_cut)",
    )

    # Build trijet combinations
    df = df.Define("Trijet_idx", "Combinations(Jet_pt_cut, 3)")

    # Trijet_btag is a helpful array mask indicating whether or not the maximum btag value in Trijet is larger than the 0.5 threshold
    df = df.Define(
        "Trijet_btag",
        """
            auto J1_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[0]);
            auto J2_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[1]);
            auto J3_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[2]);
            return J1_btagCSVV2 > 0.5 || J2_btagCSVV2 > 0.5 || J3_btagCSVV2 > 0.5;
            """,
    )

    # Assign four-momentums to each trijet combination
    df = df.Define(
        "Trijet_p4",
        """
        auto J1 = Take(Jet_p4, Trijet_idx[0]);
        auto J2 = Take(Jet_p4, Trijet_idx[1]);
        auto J3 = Take(Jet_p4, Trijet_idx[2]);
        return (J1+J2+J3)[Trijet_btag];
        """,
    )

    # Get trijet transverse momentum values from four-momentum vectors
    df = df.Define(
        "Trijet_pt",
        "return Map(Trijet_p4, [](const ROOT::Math::PxPyPzMVector &v) { return v.Pt(); })",
    )

    # Evaluate mass of trijet with maximum pt and btag higher than threshold
    df = df.Define("Trijet_mass", "Trijet_p4[ArgMax(Trijet_pt)].M()")

    return df


def book_histos(
    df: ROOT.RDataFrame, process: str, variation: str, nevents: int, inference=False
) -> Tuple[list[AGCResult], list[AGCResult]]:
    """Return the pair of lists of RDataFrame results pertaining to the given process and variation.
    The first list contains histograms of reconstructed HT and trijet masses.
    The second contains ML inference outputs"""
    # Calculate normalization for MC
    x_sec = XSEC_INFO[process]
    lumi = 3378  # /pb
    xsec_weight = x_sec * lumi / nevents
    df = df.Define("Weights", str(xsec_weight))  # default weights

    if variation == "nominal":
        # Jet_pt variations definition
        # pt_scale_up() and pt_res_up(jet_pt) return scaling factors applying to jet_pt
        # pt_scale_up() - jet energy scaly systematic
        # pt_res_up(jet_pt) - jet resolution systematic
        df = df.Vary(
            "Jet_pt",
            "ROOT::RVec<ROOT::RVecF>{Jet_pt*pt_scale_up(), Jet_pt*jet_pt_resolution(Jet_pt)}",
            ["pt_scale_up", "pt_res_up"],
        )

        if process == "wjets":
            # Flat weight variation definition
            df = df.Vary(
                "Weights",
                "Weights*flat_variation()",
                [f"scale_var_{direction}" for direction in ["up", "down"]],
            )

    # Event selection - the core part of the algorithm applied for both regions
    # Selecting events containing at least one lepton and four jets with pT > 30 GeV
    # Applying requirement at least one of them must be b-tagged jet (see details in the specification)
    df = (
        df.Define(
            "Electron_mask",
            "Electron_pt > 30 && abs(Electron_eta) < 2.1 && Electron_sip3d < 4 && Electron_cutBased == 4",
        )
        .Define(
            "Muon_mask",
            "Muon_pt > 30 && abs(Muon_eta) < 2.1 && Muon_sip3d < 4 && Muon_tightId && Muon_pfRelIso04_all < 0.15",
        )
        .Filter("Sum(Electron_mask) + Sum(Muon_mask) == 1")
        .Define("Jet_mask", "Jet_pt > 30 && abs(Jet_eta) < 2.4 && Jet_jetId == 6")
        .Filter("Sum(Jet_mask) >= 4")
    )

    # create columns for "good" jets
    df = (
        df.Define("Jet_pt_cut", "Jet_pt[Jet_mask]")
        .Define("Jet_btagCSVV2_cut", "Jet_btagCSVV2[Jet_mask]")
        .Define("Jet_eta_cut", "Jet_eta[Jet_mask]")
        .Define("Jet_phi_cut", "Jet_phi[Jet_mask]")
        .Define("Jet_mass_cut", "Jet_mass[Jet_mask]")
    )

    # b-tagging variations for nominal samples
    if variation == "nominal":
        df = df.Vary(
            "Weights",
            "ROOT::RVecD{Weights*btag_weight_variation(Jet_pt_cut)}",
            [
                f"{weight_name}_{direction}"
                for weight_name in [f"btag_var_{i}" for i in range(4)]
                for direction in ["up", "down"]
            ],
        )

    # Define HT observable for the 4j1b region
    # Only one b-tagged region required
    # The observable is the total transvesre momentum
    # fmt: off
    df4j1b = df.Filter("Sum(Jet_btagCSVV2_cut > 0.5) == 1").Define("HT", "Sum(Jet_pt_cut)")
    # fmt: on

    # Define trijet_mass observable for the 4j2b region (this one is more complicated)
    df4j2b = define_trijet_mass(df)

    # Book histograms and, if needed, their systematic variations
    results = []
    for df, observable, region in zip([df4j1b, df4j2b], ["HT", "Trijet_mass"], ["4j1b", "4j2b"]):
        histo_model = ROOT.RDF.TH1DModel(
            name=f"{region}_{process}_{variation}",
            title=process,
            nbinsx=25,
            xlow=50,
            xup=550,
        )
        nominal_histo = df.Histo1D(histo_model, observable, "Weights")

        if variation == "nominal":
            results.append(
                AGCResult(
                    nominal_histo,
                    region,
                    process,
                    variation,
                    nominal_histo,
                    should_vary=True,
                )
            )
        else:
            results.append(
                AGCResult(
                    nominal_histo,
                    region,
                    process,
                    variation,
                    nominal_histo,
                    should_vary=False,
                )
            )
        print(f"Booked histogram {histo_model.fName}")

    ml_results: list[AGCResult] = []

    if not inference:
        return (results, ml_results)

    df4j2b = ml.define_features(df4j2b)
    df4j2b = ml.infer_output_ml_features(df4j2b)

    # Book histograms and, if needed, their systematic variations
    for i, feature in enumerate(ml.ml_features_config):
        histo_model = ROOT.RDF.TH1DModel(
            name=f"{feature.name}_{process}_{variation}",
            title=feature.title,
            nbinsx=feature.binning[0],
            xlow=feature.binning[1],
            xup=feature.binning[2],
        )

        nominal_histo = df4j2b.Histo1D(histo_model, f"results{i}", "Weights")

        if variation == "nominal":
            ml_results.append(
                AGCResult(
                    nominal_histo,
                    feature.name,
                    process,
                    variation,
                    nominal_histo,
                    should_vary=True,
                )
            )
        else:
            ml_results.append(
                AGCResult(
                    nominal_histo,
                    feature.name,
                    process,
                    variation,
                    nominal_histo,
                    should_vary=False,
                )
            )
        print(f"Booked histogram {histo_model.fName}")

    # Return the booked results
    # Note that no event loop has run yet at this point (RDataFrame is lazy)
    return (results, ml_results)


def load_cpp():
    """Load C++ helper functions. Works for both local and distributed execution."""
    try:
        # when using distributed RDataFrame 'helpers.cpp' is copied to the local_directory
        # of every worker (via `distribute_unique_paths`)
        localdir = get_worker().local_directory
        cpp_source = Path(localdir) / "helpers.h"
    except ValueError:
        # must be local execution
        cpp_source = "helpers.h"

    ROOT.gInterpreter.Declare(f'#include "{str(cpp_source)}"')


def run_mt(
    program_start: float,
    args: argparse.Namespace,
    inputs: list[AGCInput],
    results: list[AGCResult],
    ml_results: list[AGCResult],
) -> None:
    ROOT.EnableImplicitMT(args.ncores)
    print(f"Number of threads: {ROOT.GetThreadPoolSize()}")
    load_cpp()
    if args.inference:
        ml.load_cpp()

    for input in inputs:
        df = ROOT.RDataFrame("Events", input.paths)
        hist_list, ml_hist_list = book_histos(
            df, input.process, input.variation, input.nevents, inference=args.inference
        )
        results += hist_list
        ml_results += ml_hist_list

    for r in results + ml_results:
        if r.should_vary:
            r.histo = ROOT.RDF.Experimental.VariationsFor(r.histo)

    print(f"Building the computation graphs took {time() - program_start:.2f} seconds")

    # Run the event loops for all processes and variations here
    run_graphs_start = time()
    ROOT.RDF.RunGraphs([r.nominal_histo for r in results + ml_results])

    print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")


def run_distributed(
    program_start: float,
    args: argparse.Namespace,
    inputs: list[AGCInput],
    results: list[AGCResult],
    ml_results: list[AGCResult],
) -> None:
    if args.inference:

        def ml_init():
            load_cpp()
            ml.load_cpp()

        ROOT.RDF.Distributed.initialize(ml_init)
    else:
        ROOT.RDF.Distributed.initialize(load_cpp)

    scheduler_address = args.scheduler_address if args.scheduler_address else ""
    client = create_dask_client(args.scheduler, args.ncores, args.hosts, scheduler_address) # it was: with ... as client:
    for input in inputs:
        df = ROOT.RDataFrame(
            "Events",
            input.paths,
            executor=client,
            npartitions=args.npartitions,
        )
        df._headnode.backend.distribute_unique_paths(
            [
                "helpers.h",
                "ml_helpers.h",
                "ml.py",
                "models/bdt_even.root",
                "models/bdt_odd.root",
            ]
        )
        hist_list, ml_hist_list = book_histos(
            df, input.process, input.variation, input.nevents, inference=args.inference
        )
        results += hist_list
        ml_results += ml_hist_list

    for r in results + ml_results:
        if r.should_vary:
            r.histo = ROOT.RDF.Distributed.VariationsFor(r.histo)

    print(f"Building the computation graphs took {time() - program_start:.2f} seconds")

    # Run the event loops for all processes and variations here
    run_graphs_start = time()
    # for ROOT <v6.28
    #ROOT.RDF.Distributed.RunGraphs([r.nominal_histo for r in results + ml_results])
    # other ROOT versions
    ROOT.RDF.Distributed.RunGraphs([r.histo for r in results + ml_results])
    for r in results + ml_results:
        h = r.histo
        if "ResultMapProxy" in str(type(h)):
            try:
                keys = h.GetKeys()
                print(f"RResultMap with variations: {keys}")
            except RuntimeError as e:
                print(f"Still untriggered map: {h}\n {e}")
        elif "ResultPtrProxy" in str(type(h)):
            try:
                h.GetValue()  # triggers or accesses the histogram
                print("RResultPtr has value")
            except Exception as e:
                print(f"Error accessing value: {e}")
    
    print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")

In [3]:
program_start = time()

#args, unknown = parse_known_args()
#args = parse_args()

# Do not add histograms to TDirectories automatically: we'll do it ourselves as needed.
ROOT.TH1.AddDirectory(False)
# Disable interactive graphics: avoids canvases flashing on screen before we save them to file
ROOT.gROOT.SetBatch(True)

if args.verbose:
    # Set higher RDF verbosity for the rest of the program.
    # To only change the verbosity in a given scope, use ROOT.Experimental.RLogScopedVerbosity.
    ROOT.Detail.RDF.RDFLogChannel().SetVerbosity(ROOT.ELogLevel(6))

inputs: list[AGCInput] = retrieve_inputs(
    args.n_max_files_per_sample, args.remote_data_prefix, args.data_cache
)
results: list[AGCResult] = []
ml_results: list[AGCResult] = []

if args.scheduler == "mt":
    run_mt(program_start, args, inputs, results, ml_results)
else:
    if args.scheduler == "dask-remote" and not args.scheduler_address:
        raise ValueError(
            "'dask-remote' option chosen but no address provided for the scheduler. Provide it with `--scheduler-address`."
        )
    if args.scheduler_address and args.scheduler != "dask-remote":
        raise ValueError(
            f"An address of a Dask scheduler was provided but the chosen scheduler is '{args.scheduler}'. The 'dask-remote' scheduler must be chosen if an address is provided."
        )
    run_distributed(program_start, args, inputs, results, ml_results)
                
results = postprocess_results(results)
save_plots(results)
save_histos([r.histo for r in results], output_fname=args.output)
print(f"Result histograms saved in file {args.output}")

if args.inference:
    ml_results = postprocess_results(ml_results)
    save_ml_plots(ml_results)
    output_fname = args.output.split(".root")[0] + "_ml_inference.root"
    save_histos([r.histo for r in ml_results], output_fname=output_fname)
    print(f"Result histograms from ML inference step saved in file {output_fname}")

if not args.no_fitting:
    fit_histograms(filename=args.output)

if args.statistical_validation:
    fit_histograms(filename=args.output)

Booked histogram 4j1b_ttbar_nominal
Booked histogram 4j2b_ttbar_nominal
Booked histogram 4j1b_ttbar_scaledown
Booked histogram 4j2b_ttbar_scaledown
Booked histogram 4j1b_ttbar_scaleup
Booked histogram 4j2b_ttbar_scaleup
Booked histogram 4j1b_ttbar_ME_var
Booked histogram 4j2b_ttbar_ME_var
Booked histogram 4j1b_ttbar_PS_var
Booked histogram 4j2b_ttbar_PS_var
Booked histogram 4j1b_single_top_s_chan_nominal
Booked histogram 4j2b_single_top_s_chan_nominal
Booked histogram 4j1b_single_top_t_chan_nominal
Booked histogram 4j2b_single_top_t_chan_nominal
Booked histogram 4j1b_single_top_tW_nominal
Booked histogram 4j2b_single_top_tW_nominal
Booked histogram 4j1b_wjets_nominal
Booked histogram 4j2b_wjets_nominal
Building the computation graphs took 12.32 seconds
Still untriggered map: <DistRDF.Proxy.ResultMapProxy object at 0x7f11bfed1810>
 
A list of names of systematic variations was requested, but the corresponding map of variations is not
present. The variation names cannot be retrieved unle

RuntimeError: 
A list of names of systematic variations was requested, but the corresponding map of variations is not
present. The variation names cannot be retrieved unless the computation graph has properly run and
finished. Something may have gone wrong in the distributed execution, or no variation values were
explicitly requested. In the future, it will be possible to get the variation names without triggering.


Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:948 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Nothing to jit and execute.
Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:948 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Nothing to jit and execute.
Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:962 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Just-in-time compilation phase completed in 3.687749 seconds.
Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:948 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Nothing to jit and execute.
Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:962 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Just-in-time compilation phase completed in 0.369365 seconds.
Info in <[ROOT.RDF] Info /root_src/tree/dataframe/src/RLoopManager.cxx:948 in void ROOT::Detail::RDF::RLoopManager::Jit()>: Nothing to jit and execute.
Info in <[ROOT.RDF] 

In [None]:
if args.scheduler == "dask-remote":
    client.run(transfer_to_tier) 
    client.close