In [7]:
import sys
import os
import csv
import time
import matplotlib.pyplot as plt
from numpy import zeros, array
from numpy.linalg import inv
sys.path.append('/usr/local/facet/tools/python/toolbox/orbit/')
sys.path.append('/usr/local/facet/tools/python/F2_live_model')
from orbit import FacetOrbit, BaseOrbit
from orbit import orbit_plotter
from bmad import BmadLiveModel

In [13]:
# Define both upstream sets and spectrometers 
DL10_BPMs = ['BPM10511','BPM10525','BPM10631','BPM10651','BPM10731']
BC11_BPMs = ['BPM11301','BPM11312','BPM11333','BPM11358','BPM11362']
BC14_BPMs = ['BPM14701','BPM14715','BPM14801','BPM14891','BPM14901']

SPECTROMETER_BPMs = {'DL10':'BPM10731','BC11':'BPM11333','BC14':'BPM14801'}

In [14]:
def load_orbit(filepath):
    """ use orbit module to ingest saved file given by <filepath> """
    b = BaseOrbit()
    orbit = b.from_MATLAB_file(filepath)
    return orbit

In [33]:
def orbit_fit(f2m, bpmnames, x_raw, y_raw):
    """
    fits BPM data using linear maps, returns orbit at single position s0
    args:
        f2m: BmadLiveModel
        bpmnames: numpy array of BPM names
        x_raw, y_raw: numpy arrays of X and Y BPM readings
    return:
        x0, y0: fitted orbit parameters (x,x',delta), (y,y')
        x_fit, y_fit: numpy arrays (Nx3) of reconstructed orbits
    """
    N_bpms = len(bpmnames)
    bpm0 = bpmnames[0]
    bpm_s = array([f2m.S[f2m.ix[bpm]] for bpm in bpmnames])
    devnames = array([f2m.device_names[f2m.ix[bpm]] for bpm in bpmnames])

    Mx = zeros((N_bpms, 3))
    My = zeros((N_bpms, 2))
    
    # iterate over all bpms, for each one get the R matrix from bpm0 to that bpm
    # the construct rows of "M matrix" as defined in eqn 4
    for i, (dev,ele) in enumerate(zip(devnames,bpmnames)):
        
        # skip BPMs with problematic names/nonexistant
        ds = dev.split(':')
        dev2 = f'{ds[1]}:{ds[0]}:{ds[2]}'
        if (dev not in devnames) and (dev2 not in devnames):
            continue
        
        r = f2m.tao.matrix(bpm0, ele)['mat6']
    
        # matrix rows are made of r11,r12,r16 & r33,r34
        Mx[i] = array([r[0][0], r[0][1], r[0][5]])
        My[i] = array([r[2][2], r[2][3]])

    # calculate x0, y0 vectors using eqn 5 from docs
    x0 = inv(Mx.T @ Mx) @ Mx.T @ x_raw
    y0 = inv(My.T @ My) @ My.T @ y_raw

    return x0, y0, Mx @ x0, My @ y0

In [16]:
# def plot_orbit(s_positions, x_raw, y_raw, x_fit, y_fit):
#     """ takes in raw BPM data & fitted trajectory, generates plot 
#     arg:
#         s_positions = numpy array of BPM in a longtiudnal posiotions 
#         x_bpm, y_bpm = raw BPM readings 
#         x_fit, y_fit = fitted orbit values 
#     """
#     # code to setup axes, figure etc goes here
#     #fig, axs = plt.subplots(1, 1, figsize=(10, 6))  
#     plt.figure(figsize=(10, 5))
#     plt.plot(s_positions, x_raw, label='Raw X')
#     plt.plot(s_positions, x_fit, 'x--', label='X fit', linewidth=2)
#     plt.xlabel('BPM')
#     plt.ylabel('X Position')
#     plt.title('Measured(Raw X) vs Fitted X Orbit')
#     plt.legend()
#     plt.grid(True)
#     plt.show()
#     plt.tight_layout()
    
    
#     plt.figure(figsize=(10, 5))
#     plt.plot(s_positions, y_raw, 'o-', label='Measured Y (BPM)')
#     plt.plot(s_positions, y_fit, 'y--', label='Fitted Y (Model)', linewidth=2)
#     plt.xlabel('BPMs')
#     plt.ylabel('Y Position')
#     plt.title('Measured vs Fitted Y Orbit')
#     plt.legend()
#     plt.grid(True)
#     plt.show()
#     plt.tight_layout()
    
#     return

In [17]:
# def test():
#     # test modular version of code
#     # get model
#     f2m = BmadLiveModel(design_only=True)
#     i_bpm = f2m.ix['BPMS']
#     bpmnames = f2m.elements[i_bpm]

#     bpmnames = bpmnames[:10] # limit it for the test purpouse 

#     # get orbit
#     fp = 'orbit_ex.mat'
#     # orbit = load_orbit(fp)
    
#     devnames = f2m.device_names[i_bpm][:10]
#     devnames_alt = [f"{d.split(':')[1]}:{d.split(':')[0]}:{d.split(':')[2]}" for d in devnames]
#     N_bpms = len(bpmnames)


#     # pack X/Y readings into arrays and make them to much s_position size which is 110
#     # x_raw = array([bpm.x for bpm in orbit.bpms])[:10]
#     # y_raw = array([bpm.y for bpm in orbit.bpms])[:10]


#     # x_fit, y_fit = orbit_fit(f2m, bpmnames, x_raw, y_raw)
#     s_positions = f2m.S[i_bpm][:10]
#     #Get S-positions of those BPMs 
#     #s_positions = f2m.S[i_bpm]

#     # print(f"s_positions: {s_positions.shape}")
#     # print(s_positions)
#     # print(f"x_raw:       {x_raw.shape}")
#     # print(f"X_fit:       {x_fit.shape}")
#     # plot_orbit(s_positions, x_raw, y_raw, x_fit, y_fit)
#     # print("orbit_fit()executed.")

#     #plot stuff!

#     # to plots all .mat from the folder_path containing all .mat files
#     folder_path = '/home/fphysics/dyerdea/orbit-examples'
#     mat_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.mat')])
#     print(f"\n total number of plots we are going to see : {len(mat_files)} (1 measured vs X_fitted and 1 measured vs Y_fitted per .mat)\n")
#     # Loop through each file
#     for i, filename in enumerate(mat_files):
#         file_path = os.path.join(folder_path, filename)
#         print(f"[{i}] Processing: {filename}")

#         try:
#             # Load orbit from file
#             orbit = load_orbit(file_path)

#             # Extract X and Y raw data from BPMs
#             x_raw, y_raw = zeros((N_bpms,)), zeros((N_bpms),)
#             for i, bpm in enumerate(orbit.bpms):
#                 if bpm.name not in devnames and bpm.name not in devnames_alt: continue
#                 x_raw[i], y_raw[i] = bpm.x, bpm.y

#             # x_raw = array([bpm.x for bpm in orbit.bpms if bpm.name in bpmnames])
#             # y_raw = array([bpm.y for bpm in orbit.bpms if bpm.name in bpmnames])

#             # Fit X and Y using your orbit_fit()
#             x0, y0, x_fit, y_fit = orbit_fit(f2m, bpmnames, x_raw, y_raw)
            
#             plot_orbit(s_positions, x_raw, y_raw, x_fit, y_fit)
#             print(f"orbit_fit() and plot completed for {filename}")

#         except Exception as e:
#             print(f"Failed on {filename}: {e}")
#             raise e



# if __name__ == '__main__':
#     test()


In [18]:
def convert_to_meters(arr):
    # Convert mm to  m once.
    arr = np.asarray(arr, dtype=float)
    if np.nanmedian(np.abs(arr)) > 0.01:
        return arr / 1000.0, True
    return arr, False


In [19]:
def measure_delta(f2m, bpm0, x0, spec_bpm, x_meas, E_design=13500, eta_min=1e-9, delta_warn=0.2):
    """
    Calculates energy deviation at a given spectrometer BPM.
    Parameters:
        f2m       : BmadLiveModel
        bpm0      : first upstream BPM used in the fit
        x0        : np.array [x0, x0', delta0] from upstream fit
        spec_bpm  : spectrometer BPM name
        x_meas    : float, measured x at spec_bpm
        E_design  : design energy in MeV
        eta_min   : dispersion threshold for safe division
        delta_warn: warning threshold for large delta
    Returns:
        delta, delta_E, residual, x_fit, eta_x, percent_error
    """
    # Get transport matrix bpm0 to spec_bpm
    r = f2m.tao.matrix(bpm0, spec_bpm)['mat6']
    eta_x = float(r[0][5])  # R16 term

    # Guard against near-zero dispersion
    if abs(eta_x) < eta_min:
        raise ValueError(f"etaâ‰ˆ0 at {spec_bpm} (eta={eta_x:.3e}); cannot compute delta safely.")

    # Predict x position at spectrometer
    x_fit = float(r[0][0]) * float(x0[0]) + float(r[0][1]) * float(x0[1]) + float(r[0][5]) * float(x0[2])

    # Compute residual and energy deviation
    residual = float(x_meas) - x_fit
    delta = residual / eta_x
    delta_E = delta * E_design
    percent_error = 100 * delta

    # Diagnostics
    print(f"Design Energy: {E_design:.2f} MeV")
    print(f"Dispersion eta at {spec_bpm}: {eta_x:.4e}")
    print(f"Residual = {residual:.4e} m")
    print(f"Debug at {spec_bpm}: x_meas = {x_meas:.4e}, x_model = {x_fit:.4e}, eta = {eta_x:.4e}, delta = {delta:.4e}")
    print(f"delta_E = {delta_E:.2f} MeV ({percent_error:.1f}%)")
    # safty check
    if abs(delta) > delta_warn:
        print(" Warning: Large energy deviation detected!")

    return delta, delta_E, residual, x_fit, eta_x, percent_error


In [20]:
def process_energy_errors():
    """
     process orbit files to compute energy diviation at speciefic BPM spectrometer.

     This functio0n loops through a given specific .mat files and load BPM data, fit upstream orbits, and calculate energy
     deviation <delta>. The results are collected into a structured outputs.
         
    """
    f2m = BmadLiveModel(design_only=True)
    folder_path = '/home/fphysics/dyerdea/orbit-examples'
    mat_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.mat')])
    mat_files = mat_files[:2]  # limit for test runs

    print("\nMeasuring Energy deviation at spectrometer BPMs ...\n")

    
    # CSV setup
    with open("the_energy_deviation_results.csv", "w", newline="") as csv_file:
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow([
            "Filename", "Region", "BPM",
            "x_meas(from x_raw)(m)", "x_model(from x_fit) (m)", "eta_x (m)",
            "delta", "delta_E (MeV)", "% Error"
        ])

        for i, filename in enumerate(mat_files):
            file_path = os.path.join(folder_path, filename)
            orbit = load_orbit(file_path)
            print(f"[{i}] Processing: {filename}")

            # Model BPM names
            bpmnames = f2m.elements[f2m.ix['BPMS']]
            bpm0 = bpmnames[0]
            devnames = f2m.device_names[f2m.ix['BPMS']]
            devnames_alt = [f"{d.split(':')[1]}:{d.split(':')[0]}:{d.split(':')[2]}" for d in devnames]
            N_bpms = len(bpmnames)

            # Gather raw measurements that aligned to model BPMs
            
            x_raw = np.zeros(N_bpms,)
            y_raw = np.zeros(N_bpms,)
            j = 0
            for bpm in orbit.bpms:
                if bpm.name in devnames or bpm.name in devnames_alt:
                    if j >= N_bpms:
                        break
                    x_raw[j] = bpm.x
                    y_raw[j] = bpm.y
                    j += 1

            if j == 0:
                print("  No matching BPM data found in this file; skipping.\n")
                continue

            # trim to the matched count
            x_raw = x_raw[:j]
            y_raw = y_raw[:j]
            bpmnames = bpmnames[:j]

            # auto unit check
            x_raw, check_x = convert_to_meters(x_raw)
            y_raw, check_y = convert_to_meters(y_raw)
            if check_x or check_y:
                print("  Converted BPM readings from mm to m")

            # choose up to 10 upstream BPMs to use to balance acuracy and stability
            z = min(10, j)

            # fit upstream
            try:
                x0, y0, x_fit, y_fit = orbit_fit(f2m, bpmnames[:z], x_raw[:z], y_raw[:z])
            except Exception as e:
                print(f"  orbit_fit failed: {e}\n")
                continue

            # measure at the three spectrometer BPMs DL10,BC11 and BC14
            for region, spec_bpm in SPECTROMETER_BPMs.items():
                try:
                    idx = list(bpmnames).index(spec_bpm)
                except ValueError:
                    print(f"  {region}/{spec_bpm} not present in this file; skipping.")
                    continue

                # measured x at spectrometer and compute energy deviation using the fit's first upstream bpm as a reference
                x_meas = x_raw[idx]

                try:
                   
                    delta, delta_E, residual, x_model, eta_x, percent_error = measure_delta(
                        f2m, bpm0=bpmnames[0], x0=x0, spec_bpm=spec_bpm, x_meas=x_meas,
                        E_design=getattr(f2m, "design_energy", 13500.0) )
                    

                    # print summary line
                    print(f" {region}: delta = {delta:.4e}, delta_E = {delta_E:.2f} MeV at {spec_bpm}")

                    # rows
                    csv_writer.writerow([
                        filename, region, spec_bpm,
                        x_meas, x_model, eta_x,
                        delta, delta_E, percent_error
                    ])

                except Exception as e:
                    print(f"  Could not compute delta at {spec_bpm}: {e}")

            print()  

if __name__ == '__main__':
    process_energy_errors()


2025-08-19 11:18:26.872 [BmadLiveModel] INFO: Building FACET2E model with Tao ... 
2025-08-19 11:18:28.368 [BmadLiveModel] INFO: Building data structures ... 



Measuring Energy deviation at spectrometer BPMs ...

[0] Processing: 05-04-2025_good_for_users.mat.mat
  Converted BPM readings from mm to m
Design Energy: 13500.00 MeV
Dispersion eta at BPM10731: -1.0223e-02
Residual = 2.6251e-17 m
Debug at BPM10731: x_meas = -7.4144e-05, x_model = -7.4144e-05, eta = -1.0223e-02, delta = -2.5678e-15
delta_E = -0.00 MeV (-0.0%)
 DL10: delta = -2.5678e-15, delta_E = -0.00 MeV at BPM10731
Design Energy: 13500.00 MeV
Dispersion eta at BPM11333: 9.1900e-03
Residual = 5.4348e-05 m
Debug at BPM11333: x_meas = 9.3595e-05, x_model = 3.9247e-05, eta = 9.1900e-03, delta = 5.9138e-03
delta_E = 79.84 MeV (0.6%)
 BC11: delta = 5.9138e-03, delta_E = 79.84 MeV at BPM11333
Design Energy: 13500.00 MeV
Dispersion eta at BPM14801: 3.6911e-02
Residual = -1.2988e-04 m
Debug at BPM14801: x_meas = 1.5239e-04, x_model = 2.8227e-04, eta = 3.6911e-02, delta = -3.5188e-03
delta_E = -47.50 MeV (-0.4%)
 BC14: delta = -3.5188e-03, delta_E = -47.50 MeV at BPM14801

[1] Processing: 

In [21]:
# def compute_energy_offset(delta, beam_energy_eV):
#     """
#     Calculate absolute energy deviation in eV.
    
#     Parameters:
#         delta0 : Relative energy deviation
#         beam_energy_eV : Design be20029358565807343am energy in eV
    
#     Returns:
#         Energy offset in eV
#     """
#     E_design = 13500 # design energy in MeV
#     beam_energy_eV =  E_design * 1e6 #convert to eV
#     #delta = 5.9e-3 # relative deviation 
#     delta_E = compute_energy_offset(delta, beam_energy_eV)

#     print(f"delta_E = {delta_E/1e6:.2f} MeV ({delta*100:.2f}%)")
#     if beam_energy_eV <= 0:
#        raise ValuError("beam energy value must be postive.") 
    
#     return delta * beam_energy_eV


In [22]:
# def performance_test(num_trials=100):
#     """
    
#     """

    
#     start = time.time()
#     for _ in range(num_trials):
#         x_meas = np.random.normal(0, 1e-2)
#         x_fit = np.random.normal(0, 1e-2)
#         eta = 
#         delta = measure_delta(x_meas, x_fit, eta)
#     end = time.time()
#     avg_time = (end - start) / num_trials
#     print(f"Avg. time per delta computation: {avg_time*1e6:.2f} mirosecond")

# if __name__ == '__main__':
#     performance_test()


In [40]:
import numpy as np
from time import time
fn = 'bsa_203206_2779_10.0hz.csv'
fp = f'/home/fphysics/dyerdea/{fn}'
with open(fp) as f:
    pvnames = f.readlines()[0].split(',')

# load model (for getting rmats)
f2m = BmadLiveModel(design_only=True)

# grab data from CSV files (table of ~2800 shots from 350 PVs)
r = np.genfromtxt(fp, skip_header=1, delimiter=',')
DL10_BPMs = [
    'BPM10511',
    'BPM10525',
    'BPM10581',
    'BPM10631',
    'BPM10651',
    'BPM10731',
    ]

# indices are the CSV columns where we'll find the BPMs shown above
# y index is just x+1 since they're neighbors in the table
idx_x = [19, 22, 25, 28, 31, 34]
idx_y = [j+1 for j in idx_x]
Ntrials = r.shape[0]

# run trials, calc average time to execute orbit_fit & store result
t_total = 0.
for i in range(Ntrials):
    print(f'running trial {i}/{Ntrials}', end='\r')
    xdata = [r[i,j] for j in idx_x]
    ydata = [r[i,j] for j in idx_y]
    ti = time()
    x0, y0, xfit, yfit = orbit_fit(f2m, DL10_BPMs, xdata, ydata)
    t_total += time() - ti

t_avg = t_total / Ntrials
print(f'avg exec time = {t_avg*1e3:.3f} ms')

2025-08-19 12:29:46.242 [BmadLiveModel] INFO: Building FACET2E model with Tao ... 
2025-08-19 12:29:47.721 [BmadLiveModel] INFO: Building data structures ... 


avg exec time = 1.580ms


In [24]:
#print(r[19])