In [None]:
from IPython.display import display, Markdown, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [None]:
import pathlib
import os
import datetime
import logging

In [None]:
logger = logging.getLogger("bba")

In [None]:
from databroker import catalog
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [None]:
import bact_analysis.utils.preprocess
import bact_analysis_bessyii.bba.preprocess_data

import bact_archiver_bessyii
from bact_math_utils import linear_fit

# Tune advances: Model to measurement comparison

## Model data

Model data to be loaded from the database 
It would be sufficient to load the lattice information ... quadrupole response matrix calculation only needs half a minute (without parallalisation)

In [None]:
path = pathlib.Path("/home/mfp/Devel/github/tslib-dev")
t_dir = path / "python" / "examples" / "use_cases"

In [None]:
os.listdir(t_dir)

Quadrupole model with dependence data

In [None]:
model_data = xr.open_dataset( t_dir / 'quadrupole_response_matrix2.nc')
model_data

### Inspection of model data

In [None]:
q1s = [quad for quad in model_data.quadrupole.values if quad[:2] == "q1"]
q1s.sort()
q1s
q2s = [quad for quad in model_data.quadrupole.values if quad[:2] == "q1"]
q2s.sort()
q1s, q2s;

In [None]:
fig, axes = plt.subplots(2,1)
ax_x, ax_y = axes
ax_x.plot(
    model_data.phase_advance.sel(dep="K", quadrupole=q1s, plane="x"), '-'
)
ax_y.plot(
    model_data.phase_advance.sel(dep="K", quadrupole=q1s, plane="y"), '-'
)


## Measurement data

Measurement from 3rd of April

In [None]:
uid = '260198d3-835d-42ad-a668-c66ad10cb34a'

In [None]:
db = catalog['heavy']

In [None]:
run = db[uid]
run;

In [None]:
start = datetime.datetime.now()
data = run.primary.to_dask()
end = datetime.datetime.now()
dt = end - start
dt

Load required data

In [None]:
data.dt_mux_power_converter_setpoint.load()
data.dt_mux_selector_selected.load()
data.dt_mr_tune_fb_vert_readback.load()
data.dt_mr_tune_fb_hor_readback.load()

### preparing measurement data

In [None]:
preprocessed_, dt_configuration = bact_analysis_bessyii.bba.preprocess_data.load_and_check_data(run)

First of repetition not that save ... 

In [None]:
idx = preprocessed_.dt_cs_setpoint >= 1
preprocessed = preprocessed_.isel(time=idx)

In [None]:
rearranged = xr.concat(
    bact_analysis.utils.preprocess.reorder_by_groups(
        preprocessed,
        preprocessed.groupby(preprocessed.dt_mux_selector_selected),
        reordered_dim="name",
        dim_sel="time",
        new_indices_dim="step",
    ),
    dim="name",
)
rearranged

## Comparison for one magnet

| Magnet family | Length | 
|---------------|--------|
| Q1            |  0.25  |
| Q2D           |  0.2   |
| Q2T           |  0.2	 |
| Q3D           |  0.25	 |
| Q3T           |  0.25	 |
| Q4D           |  0.5	 |
| Q4T           |  0.5	 |
| Q5T           |  0.2   |


In [None]:
magnet_name =  "Q1M1D1R"
magnet_length = 0.25

In [None]:
magnet_name =  "Q2M2T1R"
magnet_length = 0.2

In [None]:
magnet_name =  "Q3M1D5R"
magnet_length = 0.25

In [None]:
magnet_name =  "Q4M2T2R"
magnet_length = 0.5

In [None]:
magnet_name_lc = magnet_name.lower()
magnet_name, magnet_name_lc, magnet_length

In [None]:
sel_meas = rearranged.sel(name=magnet_name)
sel_meas;

In [None]:
sel_model = model_data.sel(quadrupole=magnet_name_lc)
sel_model

In [None]:
measurement_time_stamps = data.time.isel(time=data.dt_mux_selector_selected == magnet_name)
t0, t1 =  [datetime.datetime.fromtimestamp(t)  for t in (measurement_time_stamps.values.min(), measurement_time_stamps.values.max())]
t0, t1

In [None]:
main_rf_frequency = bact_archiver_bessyii.BESSY.getData("MCLKHX251C:freq", t0=t0, t1=t1).values.mean()
n_bunches = 400
rv_f = main_rf_frequency / n_bunches
main_rf_frequency, rv_f

In [None]:
def fractional_tune(tune_f, rv_f=rv_f):
    return tune_f / rv_f

In [None]:
mu0 = 4 * np.pi * 1e-7

def calculate_gradient(*, r, N, I):
    G = 2 * mu0 * (N * I) / r**2
    return G


BESSY II's $B \cdot \rho$ according to Peter's hand note

In [None]:
# BESSY II's 
Brho = 5.67044

In [None]:
def dI2dK(I, *, magnet_length=1.0):
    """convert current change to dK change
    """
    # BESSY II pole radius 
    r = 35e-3
    # muxer windings on quadrupoles
    N = 75
    # central gradient
    G = calculate_gradient(r=r, N=N, I=I)
    # central Kc
    Kc = G / Brho
    K = Kc * magnet_length
    return K

dI2dK(1, magnet_length=.5)

In [None]:
# select and prepare measurement data 
current = sel_meas.dt_mux_power_converter_setpoint
tune_khz =   sel_meas.dt_mr_tune_fb_hor_readback
tune = fractional_tune(tune_khz)
dtune_x = tune - tune.isel(step=np.absolute(current)<0.1).mean()
tune_model_dep = sel_model.phase_advance.sel(dep="K", plane="x")
tune_model_x = dI2dK(current) * tune_model_dep

# model data ... 
tune_khz = sel_meas.dt_mr_tune_fb_vert_readback
tune = fractional_tune(tune_khz)
dtune_y = tune - tune.isel(step=np.absolute(current)<0.1).mean()
tune_model_dep = sel_model.phase_advance.sel(dep="K", plane="y")
tune_model_y = dI2dK(current) * tune_model_dep

# why this cooking factor?
scale = 1.0

fig, axes = plt.subplots(2,1)
ax_x, ax_y = axes

ax_x.plot(current, dtune_x * scale, 'x', label='measurement')
ax_x.plot(current, tune_model_x, '-', label='model')
ax_x.set_xlabel("I [A]")
ax_x.set_ylabel(r"$\nu_x$ [rel]")
ax_x.legend()


ax_y.plot(current, dtune_y * scale * -1, 'x', label='measurement')
ax_y.plot(current, tune_model_y, '-', label='model')
ax_y.set_xlabel("I [A]")
ax_y.set_ylabel(r"$\nu_y$ [rel]")
ax_y.legend()

In [None]:
tune_model_y;

In [None]:
dtune_y;

In [None]:
(dtune_y / tune_model_y)

In [None]:
(dtune_x / tune_model_x)

## All magnets

Model provides the derivative 

$$
    \delta Q = \frac{\partial Q}{\partial K}
$$   

with 

$$
    K = \frac{G}{B \rho}
$$


The quadrupole gradient $G$ is given in $T/m$ and $B\rho$ in $Tm$, Thus the dimension of $K$ is $1/m^2$

The dimension of $\delta Q$ is thus $m^2$ as $Q$ is dimensionless

In [None]:
def calculate_fractional_tune_change(tune_khz, indices_for_ref_tune):
    """Calculate factional tune change
    
    Indices are selected to determine refernce tune
    """
    tune = fractional_tune(tune_khz)
    dtune = tune - tune.isel(step=indices_for_ref_tune).mean()
    return dtune

def calculate_tune_slope(dK, tune_khz, indices_for_ref_tune):
    """Caclulates fractional tune derivative depending on central K strength 
    """
    dtune = calculate_fractional_tune_change(tune_khz, indices_for_ref_tune)
    slope, intercept =  linear_fit.linear_fit_1d(dK, dtune)
    
    try:
        # intercept
        # not far off from middle 
        assert np.absolute(intercept[0]) < 2e-2
        # not too large error
        assert np.absolute(intercept[1]) < 2e-3
                       
        # not too large slope error should be all pretty linear
        assert np.absolute(slope[1]) < 2e-3
        
    except:
        logger.error(f"Check on fit failed {slope=} {intercept=}")
        raise 
        
    return slope[0]

def calculate_tune_slopes(sel_meas, use_diagnostic_tune=False):
    current = sel_meas.dt_mux_power_converter_setpoint
                             
    indices_for_ref_tune = np.absolute(current) < 0.1
    # Delta K for the magnet strengh for rectangular approximation
    # that's what the model provides ..
    dK_c = dI2dK(current, magnet_length=1)

    if use_diagnostic_tune:
        # use diagnostic data
        tune_khz_x = sel_meas.dt_mr_tune_fb_hor_readback
        tune_khz_y = sel_meas.dt_mr_tune_fb_vert_readback
    else:
        # use bunch by bunch feedback data
        tune_khz_x = sel_meas.dt_bbfb_tune_hor_readback_val
        tune_khz_y = sel_meas.dt_bbfb_tune_vert_readback_val
        
    slope_x = calculate_tune_slope(dK_c, tune_khz_x, indices_for_ref_tune)
    slope_y = calculate_tune_slope(dK_c, tune_khz_y, indices_for_ref_tune)
    
    return xr.DataArray(data=[slope_x, slope_y], dims=["plane"], coords=[["x", "y"]])
    
    
# calculate_tune_slopes(sel_meas)

Prepare it for all magnets

In [None]:
tmp = {name: calculate_tune_slopes(rearranged.sel(name=name), use_diagnostic_tune=True) for name in rearranged.name.values}

Sort it by family ... use magnet name convention for that 

In [None]:
names = list(tmp.keys())
names.sort()
names;

Measurement uses the names using upper case letters, model uses lower case letters

In [None]:
names_lc = [name.lower() for name in names]

In [None]:
dtunes = xr.concat([tmp[name] for name in names], pd.Index(names, name="name"))

Besides family 1 and 4 all magnet families are using positive sign

In [None]:
polarities = xr.DataArray(data=np.ones(len(dtunes), dtype=int), dims=["name"], coords=[dtunes.coords["name"].values])
q1s = np.array([name[:2] == "Q1" for name in dtunes.coords["name"].values])
polarities[q1s]= -1
q4s = np.array([name[:2] == "Q4" for name in dtunes.coords["name"].values])
polarities[q4s]= -1
polarities

In [None]:
ks_x = np.array([model_data.phase_advance.sel(plane="x", dep="K", quadrupole=name) for name in names_lc])
ks_y = np.array([model_data.phase_advance.sel(plane="y", dep="K", quadrupole=name) for name in names_lc])

In [None]:
fig, axes = plt.subplots(2,2, figsize=[16, 12], sharex=True)
ax_x, ax_y = axes[0]
ax_x.plot(names_lc, dtunes.sel(plane="x"))
ax_y.plot(names_lc, dtunes.sel(plane="y"))
ax_x, ax_y = axes[1]
ax_x.plot(names_lc, - ks_x * polarities)
# minus 1 for the convention
ax_y.plot(names_lc, - ks_y * polarities * -1)

In [None]:
rx = dtunes.sel(plane="x") / ks_x
ry = dtunes.sel(plane="y") / ks_y

In [None]:
fig, axes = plt.subplots(2,1, figsize=[16, 12], sharex=True)
ax_x, ax_y = axes

ax_x.clear()
ax_y.clear()

ax_x.plot(names_lc, rx * polarities)
# The -1 for the convention
ax_y.plot(names_lc, ry * -1 * polarities)

ax_x.set_ylabel(r"$\Delta\nu_{meas} / \Delta\nu_{mod}$")
ax_y.set_ylabel(r"$\Delta\nu_{meas} / \Delta\nu_{mod}$")
[tick.set_rotation('vertical') for tick in ax_x.get_xmajorticklabels() + ax_y.get_xmajorticklabels()]

plt.xticks(fontsize='x-small');
plt.savefig(f"bba_tune_advance_ratio_model_measurement.pdf")

Check if the uncertainty of the gradient explains the observed difference 

Not really

In [None]:
fig, ax = plt.subplots(1,1, figsize=[16, 12], sharex=True)
ax.plot(names_lc, rx /ry*-1)

[tick.set_rotation('vertical') for tick in ax.get_xmajorticklabels()]

plt.xticks(fontsize='x-small');