### Inci's Preliminary Dispersion Correction Code

In [1]:
# Imports
import numpy as np
import matplotlib.pyplot as plt
from g4beam import *
from scan import *
from scipy.optimize import differential_evolution

import math
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import cm
import numpy as np
import pandas as pd
from tqdm import *
import pandas as pd
import pickle
import itertools
from tabulate import tabulate
import tempfile
import glob
import json

In [None]:
# Make Sure g4bl is here
import os
os.environ["PATH"] += os.pathsep + "/home/incik/G4beamline-3.08/bin"
import shutil
print(shutil.which("g4bl"))

In [None]:
# Load data POST WEDGE as a Dataframe so that you can use Daniel Fu's functions
filename = "particles_after.txt"

# Skip the first two header lines that start with '#'
with open(filename) as f:
    # Read until the line containing column names
    for line in f:
        if line.startswith("#x "):
            columns = line.strip().lstrip("#").split()
            break

# Now load the data into a DataFrame
df = pd.read_csv(filename, comment="#", delim_whitespace=True, names=columns)

x_params, y_params, z_emit = calc_all_params(df) # _params are tuples of the form (emittance, beta, gamma, alpha, D, D')
print(f"D_x: {x_params[4]}, D'_x: {x_params[5]}, D_y: {y_params[4]}, D'_y: {y_params[5]}")

In [None]:
# ---------------- USER CONFIG ----------------
G4BEAMLINE_CMD = "g4bl"
TEMPLATE_FILE = "Achromat.g4bl"
OUTPUT_DIR = "achromat_runs"
VD_FILENAME = "vd_achromat.txt"   # virtual detector writes this file (ascii)
N_PARTICLES = 5000             # increase for lower noise
G4BLFILE = f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/run.g4bl"
G4BLOUTPUT =f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/{VD_FILENAME}"
E0 = 1e4                       # beam energy (GeV) - for a muon collider

# {B1_field}, {B1_width}, {B1_height}, {B1_length}, {B1_z}, {Q1_gradient}, {Q1_length}, {radius_q}
# {B2_field}, {B2_width}, {B2_height}, {B2_length}
# {Drift1_width}, {Drift1_height}, {Drift1_length}, 
# {Drift2_width}, {Drift2_height}, {Drift2_length}

var_names = ["N_PARTICLES", "B1_field", "B1_width", "B1_height", "B1_length", "B1_z",
            "Q1_gradient", "Q1_length", "radius_q", "B2_field", "B2_width", "B2_height", "B2_length",
            "Drift1_width", "Drift1_height", "Drift1_length", "Drift2_width", "Drift2_height", "Drift2_length", "VD_FILENAME"]
xvec = np.array([int(N_PARTICLES), 1.0, 100.0, 100.0, 30.0, 50.0, 12.0, 250.0, 10.0, 4.0, 100.0, 100.0, 200.0,
                100.0, 100.0, 250.0, 100.0, 100.0, 250.0, VD_FILENAME])
calcparams = {name: val for name, val in zip(var_names, xvec)}

GAP = 0.1  # in mm (can be up to 1.0 safely)
B1_z_val = float(calcparams["B1_z"])
L_B1 = float(calcparams["B1_length"])
L_D1 = float(calcparams["Drift1_length"])
L_Q1 = float(calcparams["Q1_length"])
L_D2 = float(calcparams["Drift2_length"])
L_B2 = float(calcparams["B2_length"])

Drift1_z = B1_z_val + (L_B1/2) + (L_D1/2) + GAP
Q1_z     = Drift1_z + (L_D1/2) + (L_Q1/2) + GAP
Drift2_z = Q1_z + (L_Q1/2) + (L_D2/2)+ GAP
B2_z     = Drift2_z + (L_D2/2) + (L_B2/2) + GAP
VD_z     = B2_z + (L_B2/2) + 10.0 + GAP

print(B1_z_val, Drift1_z-(L_D1/2), B1_z_val + (L_B1/2))
all_var_names = ["N_PARTICLES", "B1_field", "B1_width", "B1_height", "B1_length", "B1_z", "Drift1_z", "Q1_z", "Drift2_z", "B2_z", "VD_z",
            "Q1_gradient", "Q1_length", "radius_q", "B2_field", "B2_width", "B2_height", "B2_length",
            "Drift1_width", "Drift1_height", "Drift1_length", "Drift2_width", "Drift2_height", "Drift2_length","VD_FILENAME"]
all_xvec = np.array([int(N_PARTICLES), 1.0, 100.0, 100.0, 30.0, 50.0, Drift1_z, Q1_z, Drift2_z, B2_z, VD_z, 12.0, 250.0, 10.0, 4.0, 100.0, 100.0, 200.0, 100.0, 100.0, 250.0, 100.0, 100.0, 250.0, VD_FILENAME])
params = {name: val for name, val in zip(all_var_names, all_xvec)}
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
# Write values to the prepared G4BL template
def write_input_from_template(template_path, out_path, replacements):
    with open(template_path, 'r') as f:
        txt = f.read()
    try:
        txt = txt.format(**replacements)
    except KeyError as e:
        raise RuntimeError(f"Template substitution failed; missing placeholder: {e}")
    with open(out_path, 'w') as f:
        f.write(txt)

# How to Use?
params = {name: val for name, val in zip(all_var_names, all_xvec)}
print(params)
write_input_from_template(TEMPLATE_FILE, G4BLFILE, params)

In [None]:
# Run g4bl and get the Output file

result = subprocess.run(["g4bl", G4BLFILE], capture_output=True, text=True, check=True)
print(result)

In [7]:
# Make the results a dataframe and calculate the Courant-Snyder Parameters
df = read_trackfile(G4BLOUTPUT)
x_params, y_params, z_emit = calc_all_params(df)

### Start Achromat Optimization
The idea is to first get the post-wedge distribution, generate a basic achromat setup, and then run g4bl to have the post-wedge particle distribution go through the achromat, getting a final particle distribution and optimizing the parameters accordingly.

In [None]:
# ---------------- USER CONFIG ----------------
G4BEAMLINE_CMD = "g4bl"
TEMPLATE_FILE = "Achromat.g4bl"
OUTPUT_DIR = "achromat_runs"
VD_FILENAME = "vd_achromat.txt"   # virtual detector writes this file (ascii)
N_PARTICLES = 5000             # increase for lower noise
G4BLFILE = f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/run.g4bl"
G4BLOUTPUT =f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/{VD_FILENAME}"
E0 = 1e4                       # beam energy (GeV) - for a muon collider

# ----------------------------------------------
# Functions to Use
# ----------------------------------------------
def write_input_from_template(template_path, out_path, replacements):
    with open(template_path, 'r') as f:
        txt = f.read()
    try:
        txt = txt.format(**replacements)
    except KeyError as e:
        raise RuntimeError(f"Template substitution failed; missing placeholder: {e}")
    with open(out_path, 'w') as f:
        f.write(txt)


# ----------------------------------------------
# Insert/Read Parameters from a File
# ----------------------------------------------

def make_params_for_g4bl(achromat_params):
    
    GAP = 0.1  # in mm (can be up to 1.0 safely)
    B1_z_val = float(achromat_params["B1_z"])
    L_B1 = float(achromat_params["B1_length"])
    L_D1 = float(achromat_params["Drift1_length"])
    L_Q1 = float(achromat_params["Q1_length"])
    L_D2 = float(achromat_params["Drift2_length"])
    L_B2 = float(achromat_params["B2_length"])

    Drift1_z = B1_z_val + (L_B1/2) + (L_D1/2) + GAP
    Q1_z     = Drift1_z + (L_D1/2) + (L_Q1/2) + GAP
    Drift2_z = Q1_z + (L_Q1/2) + (L_D2/2)+ GAP
    B2_z     = Drift2_z + (L_D2/2) + (L_B2/2) + GAP
    VD_z     = B2_z + (L_B2/2) + 10.0 + GAP

    add_params = {
        "Drift1_z": Drift1_z, 
        "Q1_z": Q1_z, 
        "Drift2_z": Drift2_z, 
        "B2_z": B2_z, 
        "VD_z": VD_z, 
    }
    achromat_params.update(add_params)
    
    not_upt = {"N_PARTICLES": N_PARTICLES, "VD_FILENAME": VD_FILENAME}
    merged_for_g4bl_insert = achromat_params | not_upt
    
    return achromat_params, merged_for_g4bl_insert
    
# ----------------------------------------------
# RUN G4BL and make Dispersion Calculations from parameters
# ----------------------------------------------
def run_g4beamline(achromat_preopt_params):
    
    # Calculate all params to put in g4bl
    achromat_params, merged_for_g4bl_insert = make_params_for_g4bl(achromat_preopt_params)
    
    # Put inside g4bl
    write_input_from_template(TEMPLATE_FILE, G4BLFILE, merged_for_g4bl_insert)
    
    # Run g4bl
    subprocess.run(["g4bl", G4BLFILE], capture_output=True, text=True, check=True)

def calculate_D_for_df(output = G4BLOUTPUT):
    
    df = read_trackfile(output)
    x_params, y_params, z_emit = calc_all_params(df)
    D_dict = {"D_x": x_params[4], "D'_x": x_params[5], "D_y": y_params[4], "D'_y": y_params[5]} # , "eps_z": z_emit[0]
    
    return D_dict


def cost_fxn():
    """
    Calculates the cost based on the provided parameters and data.
    """
    try:
        D_dict = calculate_D_for_df(G4BLFILE)
    except Exception as e:
        print("Run failed:", e)
        return 1e10  # penalize failure

    # extract dispersion terms
    Dx, Dpx = D_dict["D_x"], D_dict["D'_x"]
    Dy, Dpy = D_dict["D_y"], D_dict["D'_y"]
    
    # Weights
    w_x = 1.0
    w_y = 1.0
    
    cost = w_x*(Dx*Dx + Dpx*Dpx) + w_y*(Dy*Dy + Dpy*Dpy)

    """print(f"[Trial] B1={achromat_upt_params["B1_field"]:.2f} Q={achromat_upt_params["Q1_grad"]:.1f} B2={achromat_upt_params["B2_field"]:.2f}"
        f"L_B1={achromat_upt_params["B1_length"]:.0f} L_Q1={achromat_upt_params["Q1_length"]:.0f} L_B2={achromat_upt_params["B2_length"]:.0f} "
        f"D1={achromat_upt_params["Drift1_length"]:.0f} D2={achromat_upt_params["Drift2_length"]:.0f} "
        f"=> cost={cost:.3e}")"""

    return cost

def differentialOptimizer():
    """
    Optimize a 2-bend + central quadrupole cell so that
    D_x = D'_x = D_y = D'_y = 0 at a virtual detector location.

    Approach:
    - For each trial set of magnet params, create a g4bl input that produces a single
    multi-particle run with a momentum spread and writes an ASCII zntuple with columns:
        x xp y yp delta
    - Parse the file and compute dispersions and slopes via covariance (no ± runs).
    - Minimize cost = w_x*(D_x^2 + D'_x^2) + w_y*(D_y^2 + D'_y^2).
    """
    
    bounds = [(0.0, 5.0), (0.0, 200.0), (0.0, 200.0), (0.0, 50.0), (0.0, 70.0), (0.0, 20.0), 
    (0.0, 50.0), (0.0, 20.0), (0.0, 5.0), (0.0, 200.0), (0.0, 200.0), (0.0, 50.0),
    (0.0, 200.0), (0.0, 200.0), (0.0, 300.0), (0.0, 200.0), (0.0, 200.0), (0.0, 300.0)]
    
    # coarse global search (DE)
    print("Starting global optimization (differential_evolution)...")
    de_res = differential_evolution(cost_fxn, bounds, maxiter=12, popsize=6, polish=False, disp=True)
    print("DE best:", de_res.x, "cost:", de_res.fun)
    
    # local refine from DE result with Nelder-Mead (no bounds)
    print("Refining locally (Nelder-Mead)...")
    nm_res = minimize(cost_fxn, de_res.x, method='Nelder-Mead', options={'maxiter':50, 'disp':True})
    
    print("Local refine result:", nm_res.x, "cost:", nm_res.fun)
    
    # final verification run (keeps run dir)
    run_g4beamline(nm_res.x, keep_run=True, verbose=True)
    final_meas = calculate_D_for_df()
    
    print("\nFINAL MEASUREMENT (kept run dir):")
    print(f"Run dir: {final_meas['run_dir']}")
    print("Dx = {:.6e}, Dpx = {:.6e}, Dy = {:.6e}, Dpy = {:.6e}".format(final_meas['D_x'], final_meas['Dpx'], final_meas['D_y'], final_meas['Dpy']))
    return nm_res, final_meas

In [None]:
if __name__ == "__main__":
    # Create a dummy parameter file
    # First, we need to make all the params for the g4bl
    achromat_preupt_params = {
        "B1_field": 2.0,
        "B1_width": 100.0,
        "B1_height": 100.0,
        "B1_length": 30.0,
        "B1_z": 50.0,
        "Q1_gradient": 12.0,
        "Q1_length": 30.0,
        "radius_q": 10.0,
        "B2_field": 3.0,
        "B2_width": 100.0,
        "B2_height": 100.0,
        "B2_length": 30.0,
        "Drift1_width": 100.0,
        "Drift1_height": 100.0,
        "Drift1_length": 250.0,
        "Drift2_width": 100.0,
        "Drift2_height": 100.0,
        "Drift2_length": 250.0,
        }       
    
    with open('parameters.json', 'w') as f:
        json.dump(initial_params, f, indent=4)

    # Dummy data
    sample_data = {'inputs': [1, 2, 3], 'targets': [2, 4, 6]}

    final_parameters = optimize_with_file_parameters('parameters.json', sample_data)
    print(f"\nFinal optimized parameters: {final_parameters}")

# Trial Code

In [None]:
# ---------------- USER CONFIG ----------------
G4BEAMLINE_CMD = "g4bl"
TEMPLATE_FILE = "Achromat.g4bl"
OUTPUT_DIR = "achromat_runs"
VD_FILENAME = "vd_achromat.txt"   # virtual detector writes this file (ascii)
N_PARTICLES = 5000             # increase for lower noise
G4BLFILE = f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/run.g4bl"
G4BLOUTPUT =f"/home/incik/Cooling_4D/AchromatTest/{OUTPUT_DIR}/{VD_FILENAME}"
E0 = 1e4                       # beam energy (GeV) - for a muon collider


def build_g4bl_file(xvec):
    var_names = ["N_PARTICLES", "B1_field", "B1_width", "B1_height", "B1_length", "B1_z",
                "Q1_gradient", "Q1_length", "radius_q", "B2_field", "B2_width", "B2_height", "B2_length",
                "Drift1_width", "Drift1_height", "Drift1_length", "Drift2_width", "Drift2_height", "Drift2_length", "VD_FILENAME"]
    calcparams = {name: val for name, val in zip(var_names, xvec)}
    
    GAP = 0.1  # in mm (can be up to 1.0 safely)
    B1_z_val = float(calcparams["B1_z"])
    L_B1 = float(calcparams["B1_length"])
    L_D1 = float(calcparams["Drift1_length"])
    L_Q1 = float(calcparams["Q1_length"])
    L_D2 = float(calcparams["Drift2_length"])
    L_B2 = float(calcparams["B2_length"])

    Drift1_z = B1_z_val + (L_B1/2) + (L_D1/2) + GAP
    Q1_z     = Drift1_z + (L_D1/2) + (L_Q1/2) + GAP
    Drift2_z = Q1_z + (L_Q1/2) + (L_D2/2)+ GAP
    B2_z     = Drift2_z + (L_D2/2) + (L_B2/2) + GAP
    VD_z     = B2_z + (L_B2/2) + 10.0 + GAP
    
    guess_var_names = ["N_PARTICLES", "B1_field", "B1_width", "B1_height", "B1_length", "B1_z", "Drift1_z", "Q1_z", "Drift2_z", "B2_z", "VD_z",
                "Q1_gradient", "Q1_length", "radius_q", "B2_field", "B2_width", "B2_height", "B2_length",
                "Drift1_width", "Drift1_height", "Drift1_length", "Drift2_width", "Drift2_height", "Drift2_length","VD_FILENAME"]
    guess_xvec = np.array([int(N_PARTICLES), 1.0, 100.0, 100.0, 30.0, 50.0, Drift1_z, Q1_z, Drift2_z, B2_z, VD_z, 12.0, 250.0, 10.0, 4.0, 100.0, 100.0, 200.0, 100.0, 100.0, 250.0, 100.0, 100.0, 250.0, VD_FILENAME])
    guessparams = {name: val for name, val in zip(guess_var_names, guess_xvec)}
    
    write_input_from_template(TEMPLATE_FILE, G4BLFILE, guessparams)
    return guessparams

def calculate_D_for_df(file):
    subprocess.run(["g4bl", file], capture_output=True, text=True, check=True)
    df = read_trackfile(G4BLOUTPUT)
    x_params, y_params, z_emit = calc_all_params(df)
    D_dict = {"D_x": x_params[4], "D'_x": x_params[5], "D_y": y_params[4], "D'_y": y_params[5]}
    return D_dict

In [None]:
# Optimization -- Differential Evolution
def cost_fn(guessparams):
    # 1. unpack xvec into a dictionary
    # 2. build .g4bl file
    # 3. run G4beamline and read output
    # 4. compute cost = D² + D'²
    # 5. return cost (scalar)
    
    
    # unpack optimizer vector
    (B1_field, Q1_grad, B2_field,
    L_B1, L_Q1, L_B2,
    Drift1_len, Drift2_len) = xvec

    # fill all required template params (even fixed ones)
    preopt_params = {
        "N_PARTICLES": N_PARTICLES,
        "B1_field": B1_field,
        "B1_width": 100.0,
        "B1_height": 100.0,
        "B1_length": L_B1,
        "B1_z": 50.0,
        "Q1_gradient": Q1_grad,
        "Q1_length": L_Q1,
        "radius_q": 10.0,
        "B2_field": B2_field,
        "B2_width": 100.0,
        "B2_height": 100.0,
        "B2_length": L_B2,
        "Drift1_width": 100.0,
        "Drift1_height": 100.0,
        "Drift1_length": Drift1_len,
        "Drift2_width": 100.0,
        "Drift2_height": 100.0,
        "Drift2_length": Drift2_len,
        "VD_FILENAME": VD_FILENAME
    }

    # build .g4bl
    build_g4bl_file(preopt_params)

    try:
        D_dict = calculate_D_for_df(G4BLFILE)
    except Exception as e:
        print("Run failed:", e)
        return 1e10  # penalize failure

    # extract dispersion terms
    Dx, Dpx = D_dict["D_x"], D_dict["D'_x"]
    Dy, Dpy = D_dict["D_y"], D_dict["D'_y"]

    # compute weighted cost
    cost = Dx**2 + Dpx**2 + Dy**2 + Dpy**2

    print(f"[Trial] B1={B1_field:.2f} Q={Q1_grad:.1f} B2={B2_field:.2f} "
          f"L_B1={L_B1:.0f} L_Q1={L_Q1:.0f} L_B2={L_B2:.0f} "
          f"D1={Drift1_len:.0f} D2={Drift2_len:.0f} "
          f"=> cost={cost:.3e}")

    return cost

def cost_fn(dict):
    Dx = dict["D_x"] 
    Dpx = dict["D'_x"]
    Dy = dict["D_y"]
    Dpy = dict["D'y"]
    
    # Weights
    w_x = 1.0
    w_y = 1.0
    
    return (w_x*(Dx*Dx + Dpx*Dpx) + w_y*(Dy*Dy + Dpy*Dpy))

def differentialOptimizer():
    """
    Optimize a 2-bend + central quadrupole cell so that
    D_x = D'_x = D_y = D'_y = 0 at a virtual detector location.

    Approach:
    - For each trial set of magnet params, create a g4bl input that produces a single
    multi-particle run with a momentum spread and writes an ASCII zntuple with columns:
        x xp y yp delta
    - Parse the file and compute dispersions and slopes via covariance (no ± runs).
    - Minimize cost = w_x*(D_x^2 + D'_x^2) + w_y*(D_y^2 + D'_y^2).
    """
    
    # variable bounds: [B1_field, Q1_grad, B2_field, Drift1_mm, Drift2_mm]
    bounds = [(-2.0, 2.0), (-120.0, 120.0), (-2.0, 2.0), (50.0, 1000.0), (50.0, 1000.0)]
    
    # coarse global search (DE)
    print("Starting global optimization (differential_evolution)...")
    de_res = differential_evolution(cost_fn, bounds, maxiter=12, popsize=6, polish=False, disp=True)
    print("DE best:", de_res.x, "cost:", de_res.fun)
    
    # local refine from DE result with Nelder-Mead (no bounds)
    print("Refining locally (Nelder-Mead)...")
    nm_res = minimize(cost_fn, de_res.x, method='Nelder-Mead', options={'maxiter':50, 'disp':True})
    
    print("Local refine result:", nm_res.x, "cost:", nm_res.fun)
    # final verification run (keeps run dir)
    final_meas = run_once(nm_res.x, keep_run=True, verbose=True)
    
    print("\nFINAL MEASUREMENT (kept run dir):")
    print(f"Run dir: {final_meas['run_dir']}")
    print("Dx = {:.6e}, Dpx = {:.6e}, Dy = {:.6e}, Dpy = {:.6e}".format(final_meas['D_x'], final_meas['Dpx'], final_meas['D_y'], final_meas['Dpy']))
    print("Twiss/emit (x,y):", final_meas['beta_x'], final_meas['emit_x'], final_meas['beta_y'], final_meas['emit_y'])
    return nm_res, final_meas
