### Imports

In [10]:
# Standard imports
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.colors import BoundaryNorm
import seaborn as sns
import xtrack as xt
import xpart as xp
import time
from rich.progress import Progress
from rich.progress import (
    Progress,
    BarColumn,
    TextColumn,
    TimeElapsedColumn,
    SpinnerColumn,
    TimeRemainingColumn,
)

# Backend for footprint analysis
import backend_footprint_analysis.InteractionPoint as inp
import backend_footprint_analysis.Detuning as dtune
import backend_footprint_analysis.Footprint as fp
import backend_footprint_analysis.BeamPhysics as BP


import json

# Improve style
sns.set_theme(style="whitegrid")
%config InlineBackend.figure_format='retina'


### Load data

In [11]:
# function to return twiss and survey from a given madx folder
def return_twisse_and_survey(path, b2=False):
    with open(path) as fid:
        dd = json.load(fid)
    line = xt.Line.from_dict(dd)
    line.build_tracker()
    p_co = xp.Particles.from_dict(dd["particle_on_tracker_co"])
    pd_sv = line.survey(reverse=b2).to_pandas()
    pd_tw = line.twiss(particle_co_guess=p_co, reverse=b2).to_pandas()
    return pd_tw, pd_sv


def correct_dataframe(twiss_b1, survey_b1, twiss_b2, survey_b2):
    # Correct indices
    twiss_b1.index = twiss_b1.name
    twiss_b2.index = twiss_b2.name
    survey_b1.index = survey_b1.name
    survey_b2.index = survey_b2.name

    # # Correct column names
    survey_b1.columns = [
        "x",
        "y",
        "z",
        "theta",
        "phi",
        "psi",
        "name",
        "s",
        "drift_length",
        "angle",
        "tilt",
        "element0",
    ]
    survey_b2.columns = [
        "x",
        "y",
        "z",
        "theta",
        "phi",
        "psi",
        "name",
        "s",
        "drift_length",
        "angle",
        "tilt",
    ]

    return twiss_b1, survey_b1, twiss_b2, survey_b2


In [12]:
# Load data
l_arr = []
l_b = [
    1963,
    2116,
]  # 1963, 2116]
l_bb = [
    True,
    True,
]  # False, False]
for idx_b1, idx_b2 in [
    ["000", "002"],
    ["001", "003"],
]:  # ["004", "006"], ["005", "007"]]:
    twiss_b1, survey_b1 = return_twisse_and_survey(
        "../opt_flathv_75_1500_withBB_chroma15_footprint/madx_"
        + idx_b1
        + "/xsuite_lines/line_bb_for_tracking.json",
        b2=False,
    )
    twiss_b2, survey_b2 = return_twisse_and_survey(
        "../opt_flathv_75_1500_withBB_chroma15_footprint/madx_"
        + idx_b2
        + "/xsuite_lines/line_bb_for_tracking.json",
        b2=True,
    )
    twiss_b1, survey_b1, twiss_b2, survey_b2 = correct_dataframe(
        twiss_b1, survey_b1, twiss_b2, survey_b2
    )
    l_arr.append((twiss_b1, survey_b1, twiss_b2, survey_b2))

# Importing twiss and survey
# twiss_b1_original  = pd.read_pickle('data/lhcb1_opticsfile30_twiss.pkl')
# survey_b1_original= pd.read_pickle('data/lhcb1_opticsfile30_survey.pkl')

# twiss_b2_original  = pd.read_pickle('data/lhcb2_opticsfile30_twiss.pkl')
# survey_b2_original = pd.read_pickle('data/lhcb2_opticsfile30_survey.pkl')


Done loading line from dict.           
generating ./2967d3cd915f418683e97f40c0b24fcb.c
the current directory is '/afs/cern.ch/work/c/cdroin/private/DA_study/master_study/analysis'


 6606 | #  define _GNU_SOURCE // enable GNU libc NAN extension if possible
      | 
In file included from 2967d3cd915f418683e97f40c0b24fcb.c:50:
/afs/cern.ch/work/c/cdroin/private/DA_study/miniconda/include/python3.10/pyconfig.h:1621: note: this is the location of the previous definition
 1621 | #define _GNU_SOURCE 1
      | 


Done loading line from dict.           
generating ./e054270b22e643c2897b836dbe108ca3.c
the current directory is '/afs/cern.ch/work/c/cdroin/private/DA_study/master_study/analysis'


 6606 | #  define _GNU_SOURCE // enable GNU libc NAN extension if possible
      | 
In file included from e054270b22e643c2897b836dbe108ca3.c:50:
/afs/cern.ch/work/c/cdroin/private/DA_study/miniconda/include/python3.10/pyconfig.h:1621: note: this is the location of the previous definition
 1621 | #define _GNU_SOURCE 1
      | 


Done loading line from dict.           
generating ./f0ee455229c64c75b361753f48b09203.c
the current directory is '/afs/cern.ch/work/c/cdroin/private/DA_study/master_study/analysis'


 6606 | #  define _GNU_SOURCE // enable GNU libc NAN extension if possible
      | 
In file included from f0ee455229c64c75b361753f48b09203.c:50:
/afs/cern.ch/work/c/cdroin/private/DA_study/miniconda/include/python3.10/pyconfig.h:1621: note: this is the location of the previous definition
 1621 | #define _GNU_SOURCE 1
      | 


Done loading line from dict.           
generating ./99001fb6450e4ea98e8435eda65fe2c6.c
the current directory is '/afs/cern.ch/work/c/cdroin/private/DA_study/master_study/analysis'


 6606 | #  define _GNU_SOURCE // enable GNU libc NAN extension if possible
      | 
In file included from 99001fb6450e4ea98e8435eda65fe2c6.c:50:
/afs/cern.ch/work/c/cdroin/private/DA_study/miniconda/include/python3.10/pyconfig.h:1621: note: this is the location of the previous definition
 1621 | #define _GNU_SOURCE 1
      | 


In [4]:
def clean_twiss_and_survey(twiss_b1, survey_b1, twiss_b2, survey_b2, b, bb):
    try:
        # Edit df to look like original
        twiss_b2_cleaned = twiss_b2.drop("_end_point")
        twiss_b2_cleaned.loc["ip3"], twiss_b2_cleaned.loc["lhcb2ip3_p_"] = (
            twiss_b2_cleaned.loc["lhcb2ip3_p_"].copy(),
            twiss_b2_cleaned.loc["ip3"].copy(),
        )
        twiss_b2_cleaned.index = twiss_b2_cleaned.name
        s_ip3 = twiss_b2_cleaned.loc["ip3"].s
        s_lhcb2ip3_p_ = twiss_b2_cleaned.loc["lhcb2ip3_p_"].s
        twiss_b2_cleaned.loc["ip3", "s"] = s_lhcb2ip3_p_
        twiss_b2_cleaned.loc["lhcb2ip3_p_", "s"] = s_ip3
        twiss_b2 = twiss_b2_cleaned

        survey_b2_cleaned = survey_b2.drop("_end_point")
        survey_b2_cleaned.loc["ip3"], survey_b2_cleaned.loc["lhcb2ip3_p_"] = (
            survey_b2_cleaned.loc["lhcb2ip3_p_"].copy(),
            survey_b2_cleaned.loc["ip3"].copy(),
        )
        survey_b2_cleaned.index = survey_b2_cleaned.name
        s_ip3 = survey_b2_cleaned.loc["ip3"].s
        s_lhcb2ip3_p_ = survey_b2_cleaned.loc["lhcb2ip3_p_"].s
        survey_b2_cleaned.loc["ip3", "s"] = s_lhcb2ip3_p_
        survey_b2_cleaned.loc["lhcb2ip3_p_", "s"] = s_ip3
        survey_b2 = survey_b2_cleaned

        twiss_b1 = twiss_b1.drop("_end_point")
        survey_b1 = survey_b1.drop("_end_point")
    except:
        print("Bug in cleaning", b, bb)
    return twiss_b1, survey_b1, twiss_b2, survey_b2


# Clean data
l_arr_cleaned = []
for (twiss_b1, survey_b1, twiss_b2, survey_b2), b, bb in zip(l_arr, l_b, l_bb):
    l_arr_cleaned.append(clean_twiss_and_survey(twiss_b1, survey_b1, twiss_b2, survey_b2, b, bb))


In [5]:
def compute_footprint(twiss_b1, survey_b1, twiss_b2, survey_b2, b, bb):
    # Define beams
    B1 = inp.Beam(
        "b1", twiss_b1, survey_b1, Nb=1.4e11, E=7.0e12, emittx_n=2.5e-6, emitty_n=2.5e-6, dp_p0=0
    )

    B2 = inp.Beam(
        "b2", twiss_b2, survey_b2, Nb=1.4e11, E=7.0e12, emittx_n=2.5e-6, emitty_n=2.5e-6, dp_p0=0
    )

    IP1 = inp.InteractionPoint("ip1", B1, B2)
    IP5 = inp.InteractionPoint("ip5", B1, B2)

    # Generating Coord grid
    # =========================================================
    # coordinates = fp.generate_coordGrid([0.05,10],[0.01*np.pi/2,0.99*np.pi/2],labels = ['r_n','theta_n'],nPoints=500)
    coordinates = fp.generate_coordGrid(
        np.logspace(np.log10(0.5), np.log10(10), 10),
        np.linspace(0.01 * np.pi / 2, 0.99 * np.pi / 2, 7),
        labels=["r_n", "theta_n"],
    )

    coordinates.insert(0, "x_n", coordinates["r_n"] * np.cos(coordinates["theta_n"]))
    coordinates.insert(1, "y_n", coordinates["r_n"] * np.sin(coordinates["theta_n"]))

    coordinates.insert(0, "J_x", (coordinates["x_n"] ** 2) * B1.emittx / 2)
    coordinates.insert(1, "J_y", (coordinates["y_n"] ** 2) * B1.emitty / 2)

    # coordinates.sort_values(by=['r_n'],inplace=True)
    # =========================================================

    def compute_lr_ho_footprint(coord):
        DQx_oct, DQy_oct = np.zeros(len(coord["J_x"])), np.zeros(len(coord["J_x"]))
        DQx_ho, DQy_ho = np.zeros(len(coord["J_x"])), np.zeros(len(coord["J_x"]))

        with Progress(
            "{task.description}",
            SpinnerColumn(),
            BarColumn(bar_width=40),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
            TimeElapsedColumn(),
            TimeRemainingColumn(),
        ) as progress:
            task1 = progress.add_task("[red]Both IPs...", total=2)
            task2 = progress.add_task("[green]Each BBLR...", total=len(IP1.lr))
            task3 = progress.add_task("[green]Remaining time", total=len(IP1.lr))

            for _IP in [IP1, IP5]:
                # Computing octupolar effect
                # ------------------------------
                # Iterating over LR only
                progress.update(task2, completed=0, refresh=True)
                progress.update(task3, completed=0, refresh=True)
                for index, _bb in _IP.lr.iterrows():
                    _DQx, _DQy = dtune.DQx_DQy(
                        ax=coord["x_n"],
                        ay=coord["y_n"],
                        r=_bb["r"],
                        dx_n=_bb["dx_n"],
                        dy_n=_bb["dy_n"],
                        A_w_s=_bb["A_w_s"],
                        B_w_s=_bb["B_w_s"],
                        xi=_IP.b2.xi,
                    )

                    DQx_oct += _DQx
                    DQy_oct += _DQy

                    progress.reset(task2, completed=progress.tasks[task2].completed)
                    progress.update(task2, advance=1, refresh=True)
                    progress.update(task3, advance=1, refresh=True)

                # Computing Head-on component (ex: bb_ho.c1b1_00):
                main_ho = _IP.ho.loc[f"bb_ho.c{_IP.name[-1]}b1_00"]
                # ------------------------------
                _DQx, _DQy = dtune.DQx_DQy(
                    ax=coord["x_n"],
                    ay=coord["y_n"],
                    r=main_ho["r"],
                    dx_n=main_ho["dx_n"],
                    dy_n=main_ho["dy_n"],
                    A_w_s=main_ho["A_w_s"],
                    B_w_s=main_ho["B_w_s"],
                    xi=_IP.b2.xi,
                )
                DQx_ho += _DQx
                DQy_ho += _DQy

                progress.update(task1, advance=1, refresh=True)

        return (DQx_oct, DQy_oct), (DQx_ho, DQy_ho)

    # COMPUTING
    # =========================
    s_time = time.time()
    (DQx_lr, DQy_lr), (DQx_ho, DQy_ho) = compute_lr_ho_footprint(coordinates)
    e_time = time.time()

    # Saving data
    coordinates.insert(1, "DQx_lr", DQx_lr)
    coordinates.insert(2, "DQy_lr", DQy_lr)
    coordinates.insert(3, "DQx_ho", DQx_ho)
    coordinates.insert(4, "DQy_ho", DQy_ho)

    coordinates.to_pickle(f"data/Bessel_footprint_" + str(b) + "_" + str(bb) + ".pkl")

    return coordinates


l_coordinates = []
for (twiss_b1, survey_b1, twiss_b2, survey_b2), b, bb in zip(l_arr_cleaned, l_b, l_bb):
    l_coordinates.append(compute_footprint(twiss_b1, survey_b1, twiss_b2, survey_b2, b, bb))


Output()

Output()

Output()

KeyError: 'bb_ho.c1b1_00'

In [19]:
def plot_footprint(coordinates, b, bb):
    # PLOTTING
    # ================================================================
    Qx_0, Qy_0 = 0.316, 0.321

    cmap = "viridis"
    # fig, axes = plt.subplots(2, 2, figsize=(12, 8))
    # fig.suptitle(f'Analytical footprint for WP [62.316, 60.321], chroma 15, bunch {b}, beam-beam: {bb}\n')

    # # Plotting coordinates
    # # -----------------------
    # plt.sca(axes[0, 0])
    # axes[0, 0].set_title("Coordinates")
    # plt.scatter(
    #     coordinates["x_n"],
    #     coordinates["y_n"],
    #     s=5,
    #     c=coordinates["r_n"],
    #     alpha=0.8,
    #     norm=BoundaryNorm(boundaries=np.linspace(0, 10, 11), ncolors=int(0.9 * 256)),
    # )
    # plt.xlabel(r"$x_n$", fontsize=16)
    # plt.ylabel(r"$y_n$", fontsize=16)
    # plt.axis("equal")
    # cbar = plt.colorbar()
    # plt.set_cmap(cmap)
    # cbar.ax.set_ylim([0, np.max(coordinates["r_n"])])
    # cbar.ax.set_ylabel(r"$\sqrt{(2J_x + 2J_y)/\varepsilon}$ [$\sigma$]", fontsize=12)

    # # Plotting LR only
    # # -----------------------
    # plt.sca(axes[0, 1])
    # axes[0, 1].set_title("LR only")
    # BP.plotWorkingDiagram(
    #     order=12,
    #     QxRange=np.array([0.25, 0.35]),
    #     QyRange=np.array([0.25, 0.35]),
    #     alpha=0.2,
    #     zorder=-1000,
    # )
    # plt.plot([Qx_0], [Qy_0], "P", markersize=5, color="C3", alpha=0.5, label="Unperturbed W.P.")

    # plt.scatter(
    #     Qx_0 + coordinates["DQx_lr"],
    #     Qy_0 + coordinates["DQy_lr"],
    #     s=5,
    #     c=coordinates["r_n"],
    #     alpha=0.8,
    #     norm=BoundaryNorm(boundaries=np.linspace(0, 10, 11), ncolors=int(0.9 * 256)),
    # )
    # plt.xlabel(r"$Q_x$", fontsize=16)
    # plt.ylabel(r"$Q_y$", fontsize=16)
    # plt.axis("equal")
    # window = 0.02
    # plt.xlim([Qx_0 - window, Qx_0 + window])
    # plt.ylim([Qy_0 - window, Qy_0 + window])
    # cbar = plt.colorbar()
    # plt.set_cmap(cmap)
    # cbar.ax.set_ylim([0, np.max(coordinates["r_n"])])
    # cbar.ax.set_ylabel(r"$\sqrt{(2J_x + 2J_y)/\varepsilon}$ [$\sigma$]", fontsize=12)

    # # Plotting head-on only
    # # -----------------------
    # plt.sca(axes[1, 0])
    # axes[1, 0].set_title("Head-on only")
    # BP.plotWorkingDiagram(
    #     order=12,
    #     QxRange=np.array([0.25, 0.35]),
    #     QyRange=np.array([0.25, 0.35]),
    #     alpha=0.2,
    #     zorder=-1000,
    # )
    # plt.plot([Qx_0], [Qy_0], "P", markersize=5, color="C3", alpha=0.5, label="Unperturbed W.P.")

    # plt.scatter(
    #     Qx_0 + coordinates["DQx_ho"],
    #     Qy_0 + coordinates["DQy_ho"],
    #     s=5,
    #     c=coordinates["r_n"],
    #     alpha=0.8,
    #     norm=BoundaryNorm(boundaries=np.linspace(0, 10, 11), ncolors=int(0.9 * 256)),
    # )
    # plt.xlabel(r"$Q_x$", fontsize=16)
    # plt.ylabel(r"$Q_y$", fontsize=16)
    # plt.axis("equal")
    # window = 0.02
    # plt.xlim([Qx_0 - window, Qx_0 + window])
    # plt.ylim([Qy_0 - window, Qy_0 + window])
    # cbar = plt.colorbar()
    # plt.set_cmap(cmap)
    # cbar.ax.set_ylim([0, np.max(coordinates["r_n"])])
    # cbar.ax.set_ylabel(r"$\sqrt{(2J_x + 2J_y)/\varepsilon}$ [$\sigma$]", fontsize=12)

    # Plotting all together
    # -----------------------
    # plt.sca(axes[0, 0])
    # axes[1, 1].set_title("HO + LR")
    BP.plotWorkingDiagram(
        order=12,
        QxRange=np.array([0.25, 0.35]),
        QyRange=np.array([0.25, 0.35]),
        alpha=0.2,
        zorder=-1000,
    )
    #plt.plot([Qx_0], [Qy_0], "P", markersize=5, color="C3", alpha=0.5, label="Unperturbed W.P.")

    scatter = plt.scatter(
        Qx_0 + coordinates["DQx_lr"] + coordinates["DQx_ho"],
        Qy_0 + coordinates["DQy_lr"] + coordinates["DQy_ho"],
        s=5,
        c=coordinates["r_n"],
        alpha=0.8,
        norm=BoundaryNorm(boundaries=np.linspace(0, 10, 11), ncolors=int(0.9 * 256)),
    )
    plt.xlabel(r"$Q_x$", fontsize=16)
    plt.ylabel(r"$Q_y$", fontsize=16)
    plt.axis("equal")
    window = 0.02
    plt.xlim([Qx_0 - window, Qx_0 + window])
    plt.ylim([Qy_0 - window, Qy_0 + window])
    cbar = plt.colorbar()
    plt.set_cmap(cmap)
    cbar.ax.set_ylim([0, np.max(coordinates["r_n"])])
    cbar.ax.set_ylabel(r"$\sqrt{(2J_x + 2J_y)/\varepsilon}$ [$\sigma$]", fontsize=12)

    plt.title(f'Analytical footprint for WP [62.316, 60.321], bunch {b}, beam-beam: {bb}\n', fontsize = 10)
    plt.tight_layout()
    plt.savefig("data/footprint_" + str(b) + "_" + str(bb) + ".pdf", format="pdf")
    plt.show()


# Load coordinates from pickle
for b, bb in zip(l_b, l_bb):
    coordinates = pd.read_pickle(f"data/Bessel_footprint_" + str(b) + "_" + str(bb) + ".pkl")
    plot_footprint(coordinates, b, bb)
