# Checkpoint 3

Implement here your algorithm to estimate the parametric field from timings recorded in the 20 electrodes of the mapping catheter.

The algorithm should output the following information:

* your estimates of parametric field

In [None]:
# imports
import numpy as np
import matplotlib.pyplot as plt
import chaospy as cp

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from scipy.optimize import minimize

In [None]:
# loading of the dataset
CP3data = np.load("CP3data.npz")['arr_0']

In [None]:
# set seed for reproducibility
np.random.seed(42)

## Display one recording

In [None]:
# Select an example recording for visualization
ind_disp = 75  # Index of the sample to display
x_meas = CP3data[ind_disp][0]  # X-coordinates of electrode measurements
y_meas = CP3data[ind_disp][1]  # Y-coordinates of electrode measurements
t_meas = CP3data[ind_disp][2]  # Activation times at each electrode

In [None]:
# Scatter plot of activation times at measurement points
plt.figure(1)
plt.scatter(x_meas, y_meas, c=t_meas, vmin= 0, vmax=np.max(t_meas) )
plt.gca().set_aspect(1)
plt.colorbar(label='Time [s]')
plt.title("Measured Activation Times")
plt.show()

In [None]:
# loading of the estimate speed field data
CP3estimate = np.load("CP3field.npz")['arr_0']

# extract the speed field
speed_field = CP3estimate[ind_disp][0]

In [None]:
# Define grid for visualization
X, Y = np.meshgrid(np.linspace(-1.5,1.5,151), np.linspace(-1.5,1.5,151))

# Contour plot of the estimated speed field
contour_plot = plt.contour(X, Y, speed_field, 10) 
plt.gca().set_aspect(1)
plt.colorbar(contour_plot, label="Speed")
plt.title("Speed Field Contour Plot")
plt.xlabel("X-axis")
plt.ylabel("Y-axis")
plt.show() 

## Physics based model

Let us consider the Eikonal model: given the parametric field c(x,y), find T(x,y)

\begin{aligned}
  & c(x,y) \sqrt{ \nabla T(x,y) D \nabla T(x,y)} = 1  \,, \quad (x,y) \in [-1.5,1.5] \times [-1.5,1.5]\,,\\
  & u(x,y) = 0  \,, \qquad \qquad \qquad \qquad \qquad x=x_0,y=y_0 \,, \\
\end{aligned}

where 
\begin{aligned}
D = 1\ \vec{b} \otimes \vec{b} + \frac{1}{a_{ratio}}  \ \vec{a} \otimes \vec{a},
\end{aligned}
being $ \vec{b} = \vec{b}(\theta) $ the fiber orientation and $ \vec{a} = \vec{a}(\theta) $ the cross-fiber orientation (orthogonal to $ \vec{b}$).

The following parameters are unknown, but they take the same value for all samples:
- the fiber angle $\theta$, 
- anisotropy ratio $a_{ratio}$,
- the starting point position $y_0$.

**Parameters estimates**:
*    Fibre angle: 0.24049703461770913
*    Anisotropy ratio: 5.297275741477055
*    Starting point: -0.641635348523786

In [None]:
# Reshape dataset for PCA analysis
CP3new = np.transpose(CP3data, axes=(0,2,1))
print(CP3new.shape)

# Reshape speed field data for PCA
reshaped_data = np.array(CP3estimate).reshape(100, -1)    # 100rows x 151^2 columns

In [None]:
# Perform PCA decomposition
def pca_function(data, num_components):
    """
    Performs Principal Component Analysis to reduce dimensionality and extract dominant modes from the dataset.

    Parameters:
    data: ndarray - Input dataset (reshaped speed field)
    num_components: int - Number of principal components to retain

    Returns:
    M_pca: ndarray - Transformed dataset in reduced PCA space
    pc: ndarray - Principal component vectors (eigenvectors)
    scaler: StandardScaler - Scaler object used for normalization
    """
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data)

    pca = PCA(n_components=num_components)
    pca.fit(scaled_data)

    M_pca = pca.transform(scaled_data)  # Project data onto principal components
    pc = pca.components_  # Principal components

    return M_pca, pc, scaler

In [None]:
num_components = 5  # Number of PCA components to retain
M_pca, pc ,scaler = pca_function(reshaped_data, num_components=num_components)
print(M_pca.shape)

(100, 5)


In [None]:
# Prepare dataset for estimation
estimate_cxyt = np.empty((CP3new.shape[0]*CP3new.shape[1], 8))

# Construct augmented dataset containing PCA coefficients and measured data
for i in range(CP3new.shape[0]):  # 100 rows (samples)
    repeated_block = np.tile(M_pca[i, :], (20, 1))  # Repeat PCA components for 20 electrodes
    estimate_cxyt[i * 20:(i + 1) * 20, 0:5] = repeated_block  # Store PCA coefficients
    estimate_cxyt[i * 20:(i + 1) * 20, 5:] = CP3new[i, :, :]  # Append x, y, t data

In [None]:
# Function to reconstruct velocity field using PCA components
def create_velocity_pca2(w_i):
    """
    Reconstructs the velocity field from PCA coefficients using learned principal components.

    Parameters:
    w_i: ndarray - Optimal PCA coefficients obtained from optimization

    Returns:
    vel_reconstructed: ndarray - Reconstructed 2D velocity field
    """
    w_i = w_i.reshape(1, -1)
    vel_reconstructed = np.dot(w_i, pc)
    vel_reconstructed = scaler.inverse_transform(vel_reconstructed)
    N_h2 = int(np.sqrt(vel_reconstructed.shape[1]))
    vel_reconstructed = vel_reconstructed.reshape((N_h2, N_h2))

    return vel_reconstructed

In [None]:
# Compute loss function comparing measured and estimated activation times
def compute_loss(activation_time, t_meas):
    """
    Computes the MSE loss between estimated and measured activation times.

    Parameters:
    activation_time: ndarray - Estimated activation times
    t_meas: ndarray - True activation times at measurement points

    Returns:
    loss: float - Computed loss value (mean squared error)
    """
    loss = 0
    num_measurements = 20   # Number of electrodes
    scaling_factor = 1/num_measurements

    for i in range(num_measurements):
        loss += ((t_meas[i]-activation_time[i])**2)

    return loss*scaling_factor

In [None]:
# Mean and variance estimates of PCA components
means = [-0.51716781 , -1.3199457 ,  0.72969701 , -0.29904542 , -0.60003835]
variances = [6588.60693328, 5542.2534667 ,  5117.04410217, 1895.26031172, 1037.73057578]

# Minimum values of PCA components
minim = np.min(M_pca, axis=0)
print(minim)

[-127.00938281 -154.72448291 -194.34681165 -131.17053126  -72.25754164]


In [None]:
# Define probability distributions for PCA coefficients
np.random.seed(4)
degree = (2,2,2,2,2,2,2)  # Degree of polynomial expansion

# Define probability distributions based on means and variances
w1=cp.Gamma(means[0]**2/variances[0], variances[0]/means[0], shift=minim[0])
w2=cp.Normal(means[1], np.sqrt(variances[1]))
w3=cp.Normal(means[2], np.sqrt(variances[2]))
w4=cp.Normal(means[3], np.sqrt(variances[3]))
w5=cp.Gamma(means[4]**2/variances[4], variances[4]/means[4], shift=minim[4])

x=cp.Uniform(-1.5,1.5)
y=cp.Uniform(-1.5,1.5)

variables=[w1,w2,w3,w4,w5,x,y]
joint_dist = cp.J(*variables)

# Compute the coefficients for the polynomial chaos expansion (PCE)
expansion = cp.expansion.stieltjes(degree, joint_dist)
PCE_regression = cp.fit_regression(expansion, estimate_cxyt[:,0:7].T, estimate_cxyt[:,7])

In [None]:
# Alternative version used in the following checkpoint3_solution_r
degree = (2,3,3,3,3,3,3)  # Degree of polynomial expansion

# Compute the coefficients for the polynomial chaos expansion (PCE)
expansion = cp.expansion.stieltjes(degree, joint_dist)
PCE_regression_r = cp.fit_regression(expansion, estimate_cxyt[:,0:7].T, estimate_cxyt[:,7])

In [None]:
def checkpoint3_solution_r(x_meas, y_meas, t_meas,algo):
    """
    Alternative optimization function to estimate the parametric speed field.

    Parameters:
    x_meas, y_meas: ndarray - Measurement coordinates of electrodes.
    t_meas: ndarray - Measured activation times.
    algo: str - Optimization algorithm to be used.

    Returns:
    speed_field: ndarray - The optimized estimated speed field.
    """
    def loss_function(pesi):
        """
        Parameters:
        pesi: ndarray - Candidate PCA coefficient values being optimized.

        Returns:
        loss: float - Computed loss value.
        """
        activation_time = PCE_regression_r(pesi[0],pesi[1],pesi[2],pesi[3],pesi[4], x_meas, y_meas)
        loss = compute_loss(activation_time,t_meas)
        print("Current weights:", pesi)

        return loss

    print("Looking for an optimal solution...")

    # Optimize the PCA coefficients using the chosen algorithm
    optimal_w = minimize(loss_function, np.ones((num_components,)), method= algo, options={'maxiter': 500, 'disp': True})
    print(optimal_w.x)
    
    # Compute the speed field using the optimized coefficients
    speed_field = create_velocity_pca2(optimal_w.x)

    return speed_field

In [None]:
def checkpoint3_solution(x_meas, y_meas, t_meas, algo):
    """
    Estimates the speed field by optimizing PCA coefficients using a regression-based Polynomial Chaos Expansion (PCE) model.

    Parameters:
    x_meas, y_meas: ndarray - Spatial coordinates of electrode measurements
    t_meas: ndarray - Corresponding measured activation times
    algo: str - Optimization algorithm 

    Returns:
    speed_field: ndarray - Estimated 2D speed field
    """
    def loss_function(pesi):
        """
        Parameters:
        pesi: ndarray - Candidate PCA coefficient values being optimized.

        Returns:
        loss: float - Computed loss value.
        """
        activation_time = PCE_regression(pesi[0],pesi[1],pesi[2],pesi[3],pesi[4], x_meas, y_meas)
        loss = compute_loss(activation_time,t_meas)
        print("Current weights:", pesi)

        return loss

    print("Looking for an optimal solution...")

    # Optimize the PCA coefficients using the chosen algorithm
    optimal_w = minimize(loss_function, np.ones((num_components,)), method= algo, options={'maxiter': 500, 'disp': True})
    print(optimal_w.x)
    
    # Compute the speed field using the optimized coefficients
    speed_field_n = create_velocity_pca2(optimal_w.x)
    
    # Compute an alternative speed field solution
    speed_field_r = checkpoint3_solution_r(x_meas, y_meas, t_meas, "Powell")
    
    # Weighted combination of both solutions
    q = 0.5
    speed_field = q*speed_field_n + (1-q)*speed_field_r

    return speed_field

In [None]:
# Run the solution for a specific sample

# Extract measurement data for a specific sample
ind_disp = 25
x_meas = CP3data[ind_disp][0]
y_meas = CP3data[ind_disp][1]
t_meas = CP3data[ind_disp][2]

speed_field = checkpoint3_solution(x_meas, y_meas, t_meas, "Powell")

Looking for an optimal solution...
Current w's: [1. 1. 1. 1. 1.]
Current w's: [1. 1. 1. 1. 1.]
Current w's: [2. 1. 1. 1. 1.]
Current w's: [3.618034 1.       1.       1.       1.      ]
Current w's: [11.18419492  1.          1.          1.          1.        ]
Current w's: [23.42650054  1.          1.          1.          1.        ]
Current w's: [15.86033943  1.          1.          1.          1.        ]
Current w's: [8.2941787 1.        1.        1.        1.       ]
Current w's: [11.01228784  1.          1.          1.          1.        ]
Current w's: [10.91216496  1.          1.          1.          1.        ]
Current w's: [11.01228784  1.          1.          1.          1.        ]
Current w's: [11.01228784  2.          1.          1.          1.        ]
Current w's: [11.01228784  3.618034    1.          1.          1.        ]
Current w's: [11.01228784  6.79727168  1.          1.          1.        ]
Current w's: [11.01228784 11.94138633  1.          1.          1.        ]
