Kod używa wygenerowanych przekrojów czynnych oraz pliku zawierającego wiązkę strumienia neutrin do obliczenia przewidywanej liczby zdarzeń CC/NC dla neutrin danego zapachu.

In [None]:
import pathlib
from typing import Dict, Tuple, Optional
import numpy as np
import pandas as pd
import uproot

#Stałe potrzebne do obliczeń
NA = 6.02214076e23 #Liczba Avogadra
M_mol_Ar = 39.948 #Masa molowa Argonu
GeV_to_cm = 0.389379e-27 #Przelicza jednostki naturalne przekroju na cm^2

Najpierw przygotowano funkcje do wczytywania danych z pliku ROOT

In [None]:
# Funkcja do pobierania poszczególnych histogramów z pliku ROOT
def _root_get(obj, key: str):
    if key in obj:
        return obj[key]
    if key.endswith(";1") and key[:-2] in obj:
        return obj[key[:-2]]
    alt = key + ";1"
    if alt in obj:
        return obj[alt]
    raise KeyError(f"Brak obiektu '{key}' w pliku ROOT")

# FUnkcja do wczytywania histogramów fluxu neutrin z pliku ROOT. Zwraca środki binów energi oraz wartości fluxu w binie
def load_flux_hist(root_path: str, hist_name: str,
                   emin: Optional[float] = None, emax: Optional[float] = None
                   ) -> Tuple[np.ndarray, np.ndarray]:
  
    with uproot.open(root_path) as f:
        h = _root_get(f, hist_name)
        edges = h.axis().edges()             # krawędzie binów energii
        vals = h.values(flow=False)          # wartości fluxu
    centers = 0.5 * (edges[:-1] + edges[1:]) # środki binów

    # przycięcie do zadanego zakresu energii
    if emin is not None or emax is not None:
        m = np.ones_like(centers, dtype=bool)
        if emin is not None:
            m &= (centers >= emin)
        if emax is not None:
            m &= (centers <= emax)
        centers = centers[m]
        vals = vals[m]
    return centers, vals

# Funkcja wczytująca wiele histogramów fluxów neutrin
def load_fluxes(root_path: str, hist_map: Dict[str, str],
                emin: Optional[float] = None, emax: Optional[float] = None
                ) -> Dict[str, Tuple[np.ndarray, np.ndarray]]:
    out = {}
    for flv, hname in hist_map.items():
        E, F = load_flux_hist(root_path, hname, emin=emin, emax=emax)
        out[flv] = (E, F)
    return out



Następnie przygotowano funkcje wczytujące przekroje czynne neutrin

In [None]:


import xml.etree.ElementTree as ET

# Funkcja wczytuje pojedyńczy element w XML i wyciąga z niego przekrój oraz energię
def _parse_knot_node(knot):
    if "E" in knot.attrib and "xsec" in knot.attrib:
        return float(knot.get("E")), float(knot.get("xsec"))
    if "x" in knot.attrib and "y" in knot.attrib:
        return float(knot.get("x")), float(knot.get("y"))
    E_val, xs_val = None, None
    for ch in knot:
        tag = ch.tag.lower()
        txt = (ch.text or "").strip()
        if not txt:
            continue
        if tag == "e":
            E_val = float(txt)
        elif tag in ("xsec", "xs", "y"):
            xs_val = float(txt)
    if E_val is not None and xs_val is not None:
        return E_val, xs_val
    return None, None

# Funkcja wczytuje cały plik XML i zwraca atrybuty: "meta" - nazwa procesu, E - tablica energii, xsec - tablica przekrojów
def parse_genie_spline_xml(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    out = []
    for spl in root.findall(".//spline"):
        meta = spl.attrib.copy()
        Es, Xs = [], []
        for knot in spl.findall(".//knot"):
            E, xs = _parse_knot_node(knot)
            if E is not None:
                Es.append(E)
                Xs.append(xs)
        if not Es:
            E_flat = [float(ch.text) for ch in spl.findall(".//E") if ch.text and ch.text.strip()]
            X_flat = [float(ch.text) for ch in spl.findall(".//xsec") if ch.text and ch.text.strip()]
            if E_flat and X_flat:
                n = min(len(E_flat), len(X_flat))
                Es, Xs = E_flat[:n], X_flat[:n]

        out.append({"meta": meta, "E": np.asarray(Es, float), "xsec": np.asarray(Xs, float)})
    return out

# Określa typ oddziaływania na podstawie nazwy procesu w pliku XML
def _classify_current_from_name(name: str):
    lo = name.lower()
    if "weak[cc+nc" in lo: return "BOTH"
    if "weak[cc]" in lo: return "CC"
    if "weak[nc]" in lo: return "NC"
    if "proc:weak[cc" in lo: return "CC"
    if "proc:weak[nc" in lo: return "NC"
    if "cc]" in lo and "nc]" not in lo: return "CC"
    if "nc]" in lo and "cc]" not in lo: return "NC"
    return None

# Funkcja sumuje przekroje czynne podanej listy i zwraca wspólną tablice z energiami
def _sum_terms(lst):
    if not lst:
        return np.array([]), np.array([])
    E_union = np.unique(np.concatenate([d["E"] for d in lst if d["E"].size]))
    xs_sum = np.zeros_like(E_union)
    for d in lst:
        if not d["E"].size:
            continue
        xs_sum += np.interp(E_union, d["E"], d["xsec"], left=0.0, right=0.0)
    return E_union, xs_sum

# Funkcja ładuje przekroje czynne CC i NC z pliku XML i zwraca przeskalowane przekroje dla wszystkich energi
def load_genie_total_cc_nc(xml_path, scale=1.0, require_one=True):
    spl = parse_genie_spline_xml(xml_path)
    cc_terms, nc_terms = [], []
    for sp in spl:
        name = sp["meta"].get("name", "")
        cur = _classify_current_from_name(name)
        if cur == "CC": cc_terms.append(sp)
        elif cur == "NC": nc_terms.append(sp)
        elif cur == "BOTH": cc_terms.append(sp); nc_terms.append(sp)
    E_cc, xs_cc = _sum_terms(cc_terms)
    E_nc, xs_nc = _sum_terms(nc_terms)
    if require_one and (E_cc.size == 0 and E_nc.size == 0):
        raise RuntimeError(f"Nie znaleziono żadnego spline w {xml_path}")
    if E_cc.size and E_nc.size:
        E_all = np.unique(np.concatenate([E_cc, E_nc]))
    else:
        E_all = E_cc if E_cc.size else E_nc
    xs_cc_i = np.interp(E_all, E_cc, xs_cc, left=0, right=0) if E_cc.size else np.zeros_like(E_all)
    xs_nc_i = np.interp(E_all, E_nc, xs_nc, left=0, right=0) if E_nc.size else np.zeros_like(E_all)
    return E_all, xs_cc_i * scale, xs_nc_i * scale




Następnie zdefiniowano funkcję liczącą całke oraz liczącą liczby zdarzeń neutrinowych dla neutrin elektronowych, mionowych i taonowych.


In [None]:


# Oblicza całke i liczbę zdarzeń dla neutrina danego zapachu
def integrate_events(E: np.ndarray, flux: np.ndarray,
                     xs_cc: np.ndarray, xs_nc: np.ndarray,
                     pot: float, mass_kt: float,
                     xs_units: str = "cm2_per_nucleon") -> Dict[str, float]:
 
    if flux.shape != E.shape:
        raise ValueError("Flux i E muszą mieć tę samą długość (po interpolacji).")

    # konwersja jednostek przekroju
    xs_cc_m2 = xs_cc *GeV_to_cm * 1.0e-4
    xs_nc_m2 = xs_nc *GeV_to_cm * 1.0e-4

    # flux przeskalowany przez POT
    flux_per_m2 = flux * pot

    # Oblicza całki
    cc = np.trapz(flux_per_m2 * xs_cc_m2, E) * mass_kt*NA*1e9/M_mol_Ar
    nc = np.trapz(flux_per_m2 * xs_nc_m2, E) * mass_kt*NA*1e9/M_mol_Ar

    return {"CC": cc, "NC": nc, "TOT": cc + nc}

# Łączy poprzednio zdefiniowane funkcje i oblicza liczbe zdarzeń dla neutrin różnych zapachów
def compute_counts(flux_root: str,
                   flux_hists: Dict[str,str],
                   xs_files: Dict[str,str],
                   pot: float,
                   mass_kt: float,
                   emin: float,
                   emax: float,
                   xs_units: str = "cm2_per_nucleon") -> pd.DataFrame:
    # Wczytanie strumienia neutrin
    flux_cache = load_fluxes(flux_root, flux_hists, emin=emin, emax=emax)
    # Wczytanie przekrojów czynnych
    xs_cache = {}
    for flv, path in xs_files.items():
        if not pathlib.Path(path).exists():
            raise FileNotFoundError(path)
        xs_cache[flv] = load_genie_total_cc_nc(path, require_one=False)
    # Obliczenia liczby zdarzeń dla każdego zapachu neutrina
    rows=[]
    for flv,(E_flux,F_flux) in flux_cache.items():
        if flv not in xs_cache:
            raise KeyError(f"Brak XS dla smaku {flv}")
        E_xs, xs_cc, xs_nc = xs_cache[flv]
        xs_cc_i = np.interp(E_flux, E_xs, xs_cc, left=0.0, right=0.0)
        xs_nc_i = np.interp(E_flux, E_xs, xs_nc, left=0.0, right=0.0)
        c = integrate_events(E_flux, F_flux, xs_cc_i, xs_nc_i, pot, mass_kt, xs_units=xs_units)
        rows.append({"flavor":flv, **c})
    df = pd.DataFrame(rows).set_index("flavor")
    return df


# Wypisuje wyniki
def print_report(df: pd.DataFrame) -> None:
    with pd.option_context("display.float_format","{:.3e}".format):
        print("\n Liczby zdarzeń (CC, NC, TOT) ")
        print(df)

# Pliki potrzebne do analizy
flux_root = "g4lbne_FHC_FD.root"
flux_hists = {"nue": "nue_fluxosc", "numu": "numu_fluxosc", "nutau": "nutau_fluxosc"}
xs_files = {
    "nue": "splines_3_04_02_nuE_Ar40_80.xml",
    "numu": "splines_3_04_02_nuMu_Ar40_80.xml",
    "nutau": "splines_3_04_02_nuTau_Ar40_80.xml",
}
# Zakres energii dla których liczymy liczbę zdarzeń
emin, emax = 0, 20



In [None]:
# Wywołanie funkcji
def main(argv=None):  
    df = compute_counts(
    flux_root="g4lbne_FHC_FD.root",
    flux_hists={"nue": "nue_fluxosc", "numu": "numu_fluxosc", "nutau": "nutau_fluxosc"},
    xs_files={
        "nue": "splines_3_04_02_nuE_Ar40_80.xml",
        "numu": "splines_3_04_02_nuMu_Ar40_80.xml",
        "nutau": "splines_3_04_02_nuTau_Ar40_80.xml"
    },
    pot=1.47e21,
    mass_kt=40,
    emin=0,
    emax=20,
)
    print_report(df)

if __name__ == "__main__":
    main()