In [None]:
import sys
import os
import importlib

import numpy as np
import pandas as pd
from scipy import optimize as opt
from matplotlib import pyplot as plt
from matplotlib import animation
from matplotlib.lines import Line2D
from matplotlib.patches import Ellipse
import proplot as plot
import seaborn as sns
from tqdm import tqdm
from tqdm import trange

sys.path.append('../../')
from tools import animation as myanim
from tools import beam_analysis as ba
from tools import plotting as myplt
from tools import utils

sys.path.append('..')
from data_analysis import to_vec, to_mat, reconstruct
from data_vis import reconstruction_lines

In [None]:
plot.rc['figure.facecolor'] = 'white'
plot.rc['savefig.dpi'] = 'figure'
plot.rc['animation.html'] = 'jshtml'
plot.rc['grid.alpha'] = 0.04
plot.rc['axes.grid'] = False

In [None]:
save_figures = False

def save(figname):
    if save_figures:
        filename = os.path.join('_output/figures', figname + '.png')
        plt.savefig(filename, facecolor='white', dpi=250)

# 4D emittance measurement in the RTBT
> This notebook reconstructs the beam covariance matrix at the entrance of the Ring to Target Beam Transport (RTBT) section of the Spallation Neutron Source (SNS).

<img src="_input/rtbt.png" width=800>

## Method summary

The goal is to reconstruct the transverse beam covariance matrix at position $s = s_0$:

$$
\Sigma_{0} = \begin{bmatrix}
    \langle{x^2}\rangle & \langle{xx'}\rangle & \langle{xy}\rangle & \langle{xy'}\rangle \\
    \langle{xx'}\rangle & \langle{{x'}^2}\rangle & \langle{yx'}\rangle & \langle{x'y'}\rangle \\
    \langle{xy}\rangle & \langle{yx'}\rangle & \langle{y^2}\rangle & \langle{yy'}\rangle \\
    \langle{xy'}\rangle & \langle{x'y'}\rangle & \langle{yy'}\rangle & \langle{{y'}^2}\rangle
\end{bmatrix}.
$$

To do this, a set of $n$ wire-scanners can be placed at positions $\{s_i\} > s_0$ with $i = 1, ..., n$. A single measurement from wire-scanner $i$ will produce the real-space moments of the beam at $s_i$: $\langle{x^2}\rangle_{i}$, $\langle{y^2}\rangle_{i}$, and $\langle{xy}\rangle_{i}$. Without space charge, the transfer matrix $M_{s_0 \rightarrow s_i} = M_i$ is known. The moments at $s_0$ are then directly related to those at $s_i$ by

$$\Sigma_i = M_i \Sigma_{0} {M_i}^T.$$ This gives <br>

$$
\begin{align}
    \langle{x^2}\rangle_i &= 
        m_{11}^2\langle{x^2}\rangle_{0} 
      + m_{12}^2\langle{x'^2}\rangle_{0} 
      + 2m_{11}m_{22}\langle{xx'}\rangle_{0} ,\\
    \langle{y^2}\rangle_i &= 
        m_{33}^2\langle{y^2}\rangle_{0} 
      + m_{34}^2\langle{y'^2}\rangle_{0} 
      + 2m_{33}m_{34}\langle{yy'}\rangle_{0} ,\\
    \langle{xy}\rangle_i &= 
        m_{11}m_{33}\langle{xy}\rangle_{0} 
      + m_{12}m_{33}\langle{yx'}\rangle_{0} 
      + m_{11}m_{34}\langle{xy'}\rangle_{0} 
      + m_{12}m_{34}\langle{x'y'}\rangle_{0} ,
\end{align}
$$

where $m_{lm}$ are the elements of the transfer matrix. Taking 3 measurements with different optics settings between $s_0$ and $s_i$ (and therefore different transfer matrices) gives the 10 equations necessary to solve for $\Sigma_0$; however, real measurements will be noisy, so it is better to take more measurements if possible. Given $N$ measurements, we can form a $3N \times 1$ observation array $b$ from the measured moments and a $3N \times 10$ coefficient array $A$ from the transfer matrix such that

$$\begin{align} \mathbf{A \sigma}_0 = \mathbf{b},\end{align}$$ 

where $\mathbf{\sigma}_0$ is a $10 \times 1$ vector of the moments at $s_0$. There are 5 wire-scanners in the RTBT which operate simultaneously, so if all these are used the coefficient array will be $15N \times 10$. We then choose $\mathbf{\sigma}_0$ such that $|\mathbf{A\sigma}_0 - \mathbf{b}|^2$ is minimized:

$$ \mathbf{\sigma}_0 = (\mathbf{A}^T\mathbf{A})^{-1}\mathbf{A}^T\mathbf{b} $$

## RTBT lattice functions 

In [None]:
twiss = pd.read_csv('_output/data/twiss.dat')
ws_positions = np.loadtxt('_output/data/ws_positions.dat')

In [None]:
fig, ax = plot.subplots(figsize=(7, 2))
twiss[['s','bx','by']].plot('s', ax=ax, legend=False)
ax.format(xlabel='Position [m]', ylabel=r'$\beta$ [m]', toplabels='RTBT lattice functions')
for ws_position in ws_positions:
    ax.axvline(ws_position, color='grey', ls='--', lw=0.5, zorder=0)
ax.format(xlim=(0, twiss['s'].max()))
ax.legend(labels=[r'$\beta_x$', r'$\beta_y$', 'WS'], 
          ncols=1, loc=(1.01, 0), handlelength=1.5, fontsize='small')
plt.savefig('_output/figures/beta.png', facecolor='white', dpi=250)

## Phase scan

In [None]:
ws_names = ['ws02', 'ws20', 'ws21', 'ws23', 'ws24']

In [None]:
def load(filename, ws_name):
    path = '_output/data/{}/{}'.format(ws_name, filename)
    return np.load(path)

phases_dict, moments_dict, transfer_mats_dict = dict(), dict(), dict()
for ws_name in ws_names:
    transfer_mats_dict[ws_name] = load('transfer_mats.npy', ws_name)
    moments_dict[ws_name] = 1e6 * load('moments.npy', ws_name)
    phases_dict[ws_name] = load('phases.npy', ws_name)

In [None]:
Sigma_true = np.loadtxt('_output/data/Sigma0.dat')
Sigma_true *= 1e6

X_true = np.loadtxt('_output/data/X0.dat')
X_true *= 1e3

Observe the beam at the wire-scanners. Any greyed-out wire-scanners are not used in the reconstruction.

In [None]:
plt_kws = dict(marker='.')
fig, axes = plot.subplots(nrows=2, ncols=5, figsize=(8, 3.5), spany=False)
for ax, ws_name in zip(axes[0, :], ws_names):
    ax.plot(phases_dict[ws_name] % 1, **plt_kws)
    ax.set_title(ws_name, fontsize='large')
for ax, ws_name in zip(axes[1, :], ws_names):
    ax.plot(moments_dict[ws_name][:, 0], **plt_kws)
    ax.plot(moments_dict[ws_name][:, 1], **plt_kws)
    ax.plot(moments_dict[ws_name][:, 2], **plt_kws)
axes[0, 0].legend(labels=[r'$\nu_x$', r'$\nu_y$'], ncols=3);
axes[1, 0].format(ylabel='[mm$^2$]')
axes[1, 0].legend(labels=[r'$\langle{x^2}\rangle$', r'$\langle{y^2}\rangle$', r'$\langle{xy}\rangle$'], fontsize='small', ncols=2);
axes[0, 0].format(ylabel='Frac. phase / ($2\pi$)', xlabel='Scan index', xlabel_kw={'size':'large'}, ylabel_kw={'size':'large'})
plt.savefig('_output/figures/ws_phase_adv.png', facecolor='white', dpi=350)

In [None]:
myplt.corner(X_true, moments=True, pad=0, samples=20000, text='Initial bunch',
             kind='hist', cmap='fire_r', diag_kws=dict(color='k'), env_kws=dict(color='k'))
save('initial_dist')

## Reconstruction

In [None]:
active_ws_names = ws_names[1:]
max_n_meas = 100

In [None]:
moments_list, transfer_mats_list = [], []
for ws_name in active_ws_names:
    transfer_mats_list.extend(transfer_mats_dict[ws_name][:max_n_meas])
    moments_list.extend(moments_dict[ws_name][:max_n_meas])

In [None]:
Sigma = reconstruct(transfer_mats_list, moments_list, verbose=2)

In [None]:
print('Sigma_true')
print(Sigma_true)
eps_1_true, eps_2_true = ba.apparent_emittances(Sigma_true)
eps_x_true, eps_y_true = ba.intrinsic_emittances(Sigma_true)
alpha_x_true, alpha_y_true, beta_x_true, beta_y_true, _, _ = ba.get_twiss2D(Sigma_true)
print()
print('  eps_1, eps_2 = {}, {} [mm mrad]'.format(eps_1_true, eps_2_true))
print('  eps_x, eps_y = {}, {} [mm mrad]'.format(eps_x_true, eps_y_true))
print('  alpha_x, alpha_y = {}, {} [rad]'.format(alpha_x_true, alpha_y_true))
print('  beta_x, beta_y = {}, {} [m/rad]'.format(beta_x_true, beta_y_true))
print()
print('Sigma =')
print(Sigma)
eps_1, eps_2 = ba.apparent_emittances(Sigma)
eps_x, eps_y = ba.intrinsic_emittances(Sigma)
alpha_x, alpha_y, beta_x, beta_y, _, _ = ba.get_twiss2D(Sigma)
print()
print('  eps_1, eps_2 = {}, {} [mm mrad]'.format(eps_1, eps_2))
print('  eps_x, eps_y = {}, {} [mm mrad]'.format(eps_x, eps_y))
print('  alpha_x, alpha_y = {}, {} [rad]'.format(alpha_x, alpha_y))
print('  beta_x, beta_y = {}, {} [m/rad]'.format(beta_x, beta_y))

In [None]:
axes = myplt.rms_ellipses(Sigma_true, color='lightsteelblue', fill=True, lw=0);
axes = myplt.rms_ellipses(Sigma, axes=axes, color='red8', lw=1)
axes[1, 1].legend(labels=['True', 'Reconstructed'], loc=(0, 1.1))
save('projection_default')

## Visualization using lines

In [None]:
axes = myplt.rms_ellipses(Sigma_true, color='black', alpha=0.15, fill=True, lw=0)
myplt.rms_ellipses(Sigma, axes=axes, color='black',)

_transfer_mats_dict = dict()
for ws_name, transfer_mats in transfer_mats_dict.items():
    if ws_name in active_ws_names:
        _transfer_mats_dict[ws_name] = transfer_mats[:max_n_meas]
    
_moments_dict = dict()
for ws_name, moments in moments_dict.items():
    if ws_name in active_ws_names:
        _moments_dict[ws_name] = moments[:max_n_meas]

reconstruction_lines(axes[2, 2], _transfer_mats_dict, _moments_dict, plane='y-yp')
reconstruction_lines(axes[0, 0], _transfer_mats_dict, _moments_dict, plane='x-xp',
                     legend=True, legend_kws=dict(loc=(1.15, 0)))

## Errors 

In [None]:
def calc_errors(n_trials, n_meas, active_ws_names, sig_xy_err):
    # Reform lists.
    moments_list_original = []
    transfer_mats_list = []
    for ws_name in active_ws_names:
        moments_list_original.extend(moments_dict[ws_name][:n_meas])
        transfer_mats_list.extend(transfer_mats_dict[ws_name][:n_meas])
    # Calculate emittances a bunch of times.
    emittances = []
    for _ in range(n_trials):
        moments_list_err = np.copy(moments_list_original)
        moments_list_err[:, 2] += np.random.uniform(-sig_xy_err, sig_xy_err, size=len(moments_list_err))
        Sigma = reconstruct(transfer_mats_list, moments_list_err)
        eps_1, eps_2 = ba.intrinsic_emittances(Sigma)
        eps_x, eps_y = ba.apparent_emittances(Sigma)
        emittances.append([eps_x, eps_y, eps_1, eps_2])
    return np.array(emittances)

In [None]:
n_trials = 10000
n_meas = 1
active_ws_names = ['ws20', 'ws21', 'ws23', 'ws24']
sig_xy_err = 100.0 # [mm^2]
    
emittances = calc_errors(n_trials, max_n_meas, active_ws_names, sig_xy_err)
print('means =', np.mean(emittances, axis=0))
print('stds  =', np.std(emittances, axis=0))