## Perturbative footprint study: data creation

### Imports

In [1]:
# Standard imports
import numpy as np
import shutil
from ruamel.yaml import YAML
import os
import matplotlib.pyplot as plt
import pickle
import subprocess
import time
import pandas as pd

# Cern modules
import fillingpatterns as fp
import xtrack as xt

# Improve style
import seaborn as sns

sns.set_theme(style="whitegrid")

### Get machine (built in the script compare_footprints.py)

In [None]:
collider = xt.Multiline.from_json("output/collider_tuned_bb_on.json")
collider.build_trackers()

### Correct bbb schedule

In [None]:
# Get BB schedule
fname = "filling_scheme/8b4e_1972b_1960_1178_1886_224bpi_12inj_800ns_bs200ns.json"
patt = fp.FillingPattern.from_json(fname)

# Compute bb schedule
patt.compute_beam_beam_schedule(n_lr_per_side=25)
bbs_b1 = patt.b1.bb_schedule
bbs_b2 = patt.b2.bb_schedule

# Get list of bunches
bbs_b1

In [None]:
def fix_bb_elements(collider, bunch_nb, reset_collider=True, also_beam_2=False, print_final_result=False):
    # Reset collider
    if reset_collider:
        for i, x in enumerate(collider.lhcb1.element_names):
            if "bb_lr" in x or "bb_ho" in x:
                try:
                    collider.lhcb1.vars[x + '_scale_strength'] = collider.lhcb1.vars['beambeam_scale']
                    collider.lhcb1.element_refs[x].scale_strength = collider.lhcb1.vars[x + '_scale_strength']
                except Exception as e:
                    print(e)
                    print(f"Failed to reset {x}")
                    
                
        for i, x in enumerate(collider.lhcb2.element_names):
            if "bb_lr" in x or "bb_ho" in x:
                try:
                    collider.lhcb2.vars[x + '_scale_strength'] = collider.lhcb2.vars['beambeam_scale']
                    collider.lhcb2.element_refs[x].scale_strength = collider.lhcb2.vars[x + '_scale_strength']
                except Exception as e:
                    print(e)
                    print(f"Failed to reset {x}")

    # Take care of LR elements
    for ip, name_position in zip(
        [1, 2, 5, 8],
        [
            "Positions in ATLAS/CMS",
            "Positions in ALICE",
            "Positions in ATLAS/CMS",
            "Positions in LHCB",
        ],
    ):
        # Deactivate elements that shouldn't be here in beam 1
        idx_elements_b1 = bbs_b1.loc[bunch_nb][name_position]
        for i, x in enumerate(collider.lhcb1.element_names):
            # print(collider.lhcb1[x].to_dict())
            # First look for bb_elements for current IP
            if "bb_lr.l" + str(ip) in x or "bb_lr.r" + str(ip) in x:
                # Then only keep elements corresponding to the current IP
                pos = int(x.split("_")[2])
                if "l" in x.split("lr.")[1]:
                    pos = -pos
                if pos not in idx_elements_b1:
                    collider.lhcb1.element_refs[x].scale_strength = 0

        # Same with beam 2
        if also_beam_2:
            idx_elements_b2 = bbs_b2.loc[bunch_nb][name_position]
            for i, x in enumerate(collider.lhcb2.element_names):
                if "bb_lr.l" + str(ip) in x or "bb_lr.r" + str(ip) in x:
                    pos = int(x.split("_")[2])
                    if "l" in x.split("lr.")[1]:
                        pos = -pos
                    if pos not in idx_elements_b2:
                        collider.lhcb2.vars[x + '_scale_strength'] = 0

    # Take care of HO elements
    for ip, bool_collide in zip(
        [1, 2, 5, 8],
        ["collides in ATLAS/CMS", "collides in ALICE", "collides in ATLAS/CMS", "collides in LHCB"],
    ):
        collide_b1 = bbs_b1.loc[bunch_nb][bool_collide]

        # Deactivate elements that shouldn't be here in beam 1
        for i, x in enumerate(collider.lhcb1.element_names):
            # First look for bb_elements
            if (
                "bb_ho.l" + str(ip) in x or "bb_ho.r" + str(ip) in x or "bb_ho.c" + str(ip) in x
            ) and not collide_b1:
                collider.lhcb1.element_refs[x].scale_strength = 0

        # Same with beam 2
        if also_beam_2:
            collide_b2 = bbs_b2.loc[bunch_nb][bool_collide]
            for i, x in enumerate(collider.lhcb2.element_names):
                if (
                    "bb_ho.l" + str(ip) in x or "bb_ho.r" + str(ip) in x or "bb_ho.c" + str(ip) in x
                ) and not collide_b2:
                    collider.lhcb2.vars[x + '_scale_strength'] = 0

    # Print final result
    if print_final_result:
        # Beam 1
        print("Beam 1")
        for i, x in enumerate(collider.lhcb1.element_names):
            if "bb_ho" in x or "bb_lr" in x:
                print(x, collider.lhcb1.element_refs[x].scale_strength._value)

        # Beam 2
        if also_beam_2:
            print("Beam 2")
            for i, x in enumerate(collider.lhcb2.element_names):
                if "bb_ho" in x or "bb_lr" in x:
                    print(x, collider.lhcb2.element_refs[x].scale_strength._value)

    # Return collider
    return collider

### Get DA for each bunch

In [None]:
# Load dataframe for DA
path = "/afs/cern.ch/work/c/cdroin/private/DA_study/master_study/opt_flathv_75_1500_withBB_chroma15_1p4_all_bunches"
df = pd.read_parquet(f"{path}/da.parquet")

# Round all numbers to 3 decimals
df = df.round(3)

# Get list of bunches and list of DA
l_bunch_nb_from_DA = df["bunch_nb"]
l_DA = df["normalized amplitude in xy-plane"]

Coupling correction due to the lattice (```delta_cmr``` and ```delta_cmi``` in pymask) are ignored as they're not very relevant for the coupling study.

### Get footprint for a selected set bunches

In [None]:
def return_footprint(collider, bunch_nb, n_turns, perturbative=True, mimic_initial_condition = False):
    # Adapt collider for current bunch
    collider = fix_bb_elements(
        collider, bunch_nb, reset_collider=True, also_beam_2=False, print_final_result=False
    )
    # Get footprint
    if mimic_initial_condition:

        # First get initial condition that was built to compute DA
        r_max, r_min = 10, 2
        radial_list_to_mimic = np.linspace(r_min, r_max, 2 * 16 * (r_max - r_min), endpoint=False)
        n_angles = 5
        theta_list_to_mimic = np.linspace(0, 90, n_angles + 2)[1:-1]

        # Then reproduce this initial condition, knowing that the distributions are generated with
        # np.linspace(..., endpoint=True) in the footprint code
        fp_polar_xm = collider["lhcb1"].get_footprint(
            nemitt_x=2.5e-6,
            nemitt_y=2.5e-6,
            n_turns=n_turns,
            mode = 'polar',
            r_range = (radial_list_to_mimic[0],radial_list_to_mimic[-1]),
            n_r = len(radial_list_to_mimic),
            theta_range = (theta_list_to_mimic[0]/180*np.pi, theta_list_to_mimic[-1]/180*np.pi),
            n_theta = len(theta_list_to_mimic),
            linear_rescale_on_knobs=[
                xt.LinearRescale(knob_name="beambeam_scale", v0=0.0, dv=0.05)
            ] if perturbative else None,
        )
    else:
        fp_polar_xm = collider["lhcb1"].get_footprint(
            nemitt_x=2.5e-6,
            nemitt_y=2.5e-6,
            n_turns=n_turns,
            linear_rescale_on_knobs=[
                xt.LinearRescale(knob_name="beambeam_scale", v0=0.0, dv=0.05)
            ] if perturbative else None,
        )        

    return [fp_polar_xm.qx, fp_polar_xm.qy]

In [None]:
# Get bunch nb corresponding to percentile 0, 25, 50, 75, 100 of the DA
l_bunch_nb_sampled = []
l_DA_sampled = []
for p in [0, 25, 50, 75, 100]:
    l_bunch_nb_sampled.append(l_bunch_nb_from_DA[l_DA == l_DA.quantile(p / 100)].iloc[0])
    l_DA_sampled.append(l_DA.quantile(p / 100))

# Get the 4 footprints for each bunch
dic_footprints = {bunch_nb:[] for bunch_nb in l_bunch_nb_sampled}
n_turns = 2000
for bunch_nb in l_bunch_nb_sampled:
    dic_footprints[bunch_nb].append(return_footprint(collider, bunch_nb, n_turns=n_turns, perturbative=False, mimic_initial_condition = False))
    dic_footprints[bunch_nb].append(return_footprint(collider, bunch_nb, n_turns=n_turns, perturbative=True, mimic_initial_condition = False))
    dic_footprints[bunch_nb].append(return_footprint(collider, bunch_nb, n_turns=n_turns, perturbative=False, mimic_initial_condition = True))
    dic_footprints[bunch_nb].append(return_footprint(collider, bunch_nb, n_turns=n_turns, perturbative=True, mimic_initial_condition = True))
    

### Plot sample footprints

In [None]:
# Load l_fp
with open("output_2/l_fp.pkl", "rb") as f:
    l_fp = pickle.load(f)[:100]

# Load list of bunch numbers
with open("output_2/l_bunch_nb.pkl", "rb") as f:
    l_bunch_nb = pickle.load(f)[:100]

# Make a grid of plots for all the footprints
n_cols = 10
n_rows = int(np.ceil(len(l_fp)/n_cols))
fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols*2.5, n_rows*2))
for i, fp in enumerate(l_fp):
    try:
        ax = axs[i//n_cols, i%n_cols]
        ax.plot(fp[0], fp[1], color="C0")
        ax.plot(fp[0].T, fp[1].T, color="C0")
        ax.set_title(f"Bunch {l_bunch_nb[i]}")
        ax.set_xlabel(r"$\mathrm{Q_x}$")
        ax.set_ylabel(r"$\mathrm{Q_y}$")
        #ax.set_xlim(0, 1)
        #ax.set_ylim(0, 1)
        ax.grid()
        #ax.set_aspect("equal")
        ax.set_title(f"Bunch {l_bunch_nb[i]}")
    except:
        pass

plt.tight_layout()
plt.show()

