# CA3 model

This notebook is for building out the CA3 model as previous implementation doesn't work.

Formula is:
$$
min_{\vec{w}_{x_1}, \vec{w}_{x_2}, \vec{w}_a, \vec{w}_b} \left( -\vec{w}_{x_1}^T S_{xa} \vec{w}_a - \vec{w}_{x_2}^T S_{xb} \vec{w}_b + \sum_{i \in \{x_1, x_2, a, b\}} \frac{1}{2} \lambda_i \left( \vec{w}_i^T S_{ii} \vec{w}_i - 1 \right) + \theta_{rr}(\vec{w}_{x_1}, \vec{w}_{x_2}) \right)
$$


In [1]:
from gemmr.generative_model import GEMMR
from sklearn.cross_decomposition import CCA
from gemmr.estimators import SVDCCA
import numpy as np
from scipy.optimize import minimize

  from pkg_resources import resource_filename


## Generate some data

In [2]:
model_definition = GEMMR('cca', wx=1, wy=1, r_between=0.3)
behavioural_data_study1, imging_data_study1 = model_definition.generate_data(n=2000)
behavioural_data_study2, imging_data_study2 = model_definition.generate_data(n=190)
study1 = (imging_data_study1, behavioural_data_study1) 
study2 = (imging_data_study2, behavioural_data_study2)

In [3]:
test= SVDCCA().fit(imging_data_study1, behavioural_data_study1)
test.corrs_

array([0.29587629])

In [71]:
optimum = test.transform(imging_data_study1, behavioural_data_study1)

## Step 1: Covariance matrix

In [27]:
def mean_center(data: np.ndarray) -> np.ndarray:
    """
    Function to demean data.

    Parmeteres
    ----------
    data: np.ndarray
        data to demean

    Returns
    -------
    np.ndarray: array
        demeaned data
    """
    return data - data.mean(axis=0)

In [28]:
def cross_cov(matrix_1: np.ndarray, matrix_2: np.ndarray) -> np.ndarray:
    """
    Function to calculate 
    covariance matrix

    Parameters
    ----------
    matrix_1: np.ndarray
        A matrix tht should 
        correspond to subject by 
        features
    matrix_2: np.ndarray
        A matrix that should 
        correspond to features by
        feautres 

    Returns
    -------
    np.ndarray: array
        array of cross covariance matrix
    """
    return (matrix_1.T @ matrix_2) / matrix_1.shape[0] 

In [None]:
def data_able_to_process(study_pair: tuple, behav_data: np.ndarray, X_dat: np.ndarray) -> bool:
    """
    Function to check that data
    is in correct format to be processed

    Parameters
    ----------
     study_pair: tuple, 
         tuple of behavioural data 
         and imging data
     behav_data: np.ndarray
         array of behav_data 
     X_dat: np.ndarray
         array of X_dat
    
    Returns
    -------
    bool: boolean
        bool of if failed or not
    """
    if not isinstance(study_pair, (tuple, list)) or len(study_pair) != 2:
        print("Given argument isn't a pair of datasets")
        return False
    if not isinstance(behav_data, np.ndarray) or not isinstance(X_dat, np.ndarray):
        print("Data provided isn't a numpy array")
        return False
    if behav_data.shape[0] == 0 or X_dat.shape[0] == 0 or behav_data.shape[0] != X_dat.shape[0]:
        print(f"Mismatch between ({behav_data.shape[0]} and {X_dat.shape[0]})")

    return True

In [None]:
def calculate_covariance_matricies(*study_pairs) -> dict:
    """
    Calculates covariance matrices and auto covariance
    matricies

    Parameters
    ----------
    study_pairs: tuple
        a tuple or list containing two numpy arrays:
        (behavioural_data, imaging_data).
        Assumes data is (subjects x features).

    Returns
    -------
    covariance_results: dict
        dictionary of covariance and auto-covariance matrices

    """
    covariance_results = {}
    for idx, study_pair in enumerate(study_pairs):
        X_dat, behav_data  = study_pair
        if not data_able_to_process(study_pair, behav_data, X_dat):
            continue
        behav_data = mean_center(behav_data)
        X_dat = mean_center(X_dat)
        study_num = idx + 1
        try:
            covariance_results[f"s_behav{study_num}_behav{study_num}"] = cross_cov(behav_data, behav_data)
            covariance_results[f"s_img{study_num}_img{study_num}"] = cross_cov(X_dat, X_dat)
            covariance_results[f"s_img{study_num}_behav{study_num}"] = cross_cov(X_dat, behav_data)

        except Exception as e:
            print(f"Error calculating covariances for Study {study_num}: {e}")
            return None
    return covariance_results

In [31]:
covariance_mat = calculate_covariance_matricies(study1, study2)

In [32]:
s_x1a = cross_cov(behavioural_data_study1, imging_data_study1)
s_x2b = cross_cov(behavioural_data_study2, imging_data_study2)
s_x1x1 = cross_cov(behavioural_data_study1, behavioural_data_study1)
s_x2x2 = cross_cov(behavioural_data_study2, behavioural_data_study2)
s_aa = cross_cov(imging_data_study1, imging_data_study1)
s_bb = cross_cov(imging_data_study2, imging_data_study2)

## Step 2. Intialization of weights 

In [33]:
def weight_intialization(*weights) -> np.ndarray:
    """
    Define a set of random starting 
    weights
    
    Parameters
    ----------
    weights: tuple(int)
        tuple of set amount
        of int values
    
    Returns
    -------
    np.ndarrray
        array of numpy values
    """
    return np.random.randn(sum(weights))

In [34]:
dim = {
  '1': behavioural_data_study1.shape[1], 
   '2': behavioural_data_study2.shape[1],
    '3': imging_data_study1.shape[1],
    '4': imging_data_study2.shape[1]
 }

In [35]:
weights_0 = weight_intialization(
    behavioural_data_study1.shape[1], 
    behavioural_data_study2.shape[1],
    imging_data_study1.shape[1],
    imging_data_study2.shape[1]
    )

## Step 3. Objective function

In [36]:
dx1_shape = s_x1x1.shape[0]
dx2_shape = s_x2x2.shape[0]
da_shape = s_aa.shape[0]
db_shape = s_bb.shape[0]
dx_shape = dx1_shape + dx2_shape
dac_shape =  dx_shape + da_shape

def get_dimensions(s_x1x1, s_x2x2, s_aa):
    dx1_shape = s_x1x1.shape[0]
    dx2_shape = s_x2x2.shape[0]
    da_shape = s_aa.shape[0]
    dx_shape =  dx1_shape + dx2_shape
    return {
        'dx1_shape': dx1_shape,
        'dx_shape' : dx_shape,
        'dac_shape':  dx_shape + da_shape
    }

def get_weights(weight_array, dx1_shape, dx_shape, dac_shape):
    return {
        "wx1": weight_array[:dx1_shape],
        "wx2": weight_array[dx1_shape:dx_shape],
        "wa": weight_array[dx_shape:dac_shape],
        "wb": weight_array[dac_shape:]
    }


In [37]:
def cross_cov_term(weight_beh, cov_mat, weight_img):
    return -weight_img.T @ (cov_mat @ weight_beh)

In [38]:
def regularization_term(weight, cov_mat):
    return 0.5 * 1.0 * (weight.T @ (cov_mat @ weight) - 1)

In [39]:
def dissimilarity_penality(theta_r, img_weight1, img_weight2):
    return theta_r * 0.5 * np.sum((img_weight1 - img_weight2) ** 2)


In [40]:
def objective_function(weights, s_x1a, s_x2b, s_x1x1, s_x2x2, s_aa, s_bb, theta_r):
    dimensions = get_dimensions(s_x1x1, s_x2x2, s_aa)
    weights = get_weights(
            weights, 
            dimensions['dx1_shape'], 
            dimensions['dx_shape'], 
            dimensions['dac_shape'])
    term1 = cross_cov_term(weights['wx1'],s_x1a, weights['wa'])
    term2 = cross_cov_term(weights['wx2'],s_x2b, weights['wb'])
    reg_x1 = regularization_term(weights['wx1'], s_x1x1)
    reg_x2 = regularization_term(weights['wx2'], s_x2x2)
    reg_a = regularization_term(weights['wa'], s_aa)
    reg_b = regularization_term(weights['wb'], s_bb)
    theta_r = dissimilarity_penality(theta_r, weights['wa'], weights['wb'])
    return term1 + term2 + reg_x1 + reg_x2 + reg_a + reg_b + theta_r


## Step 4: Minimise

In [None]:
best_loss = float('inf')
optimal_theta_r = None
optimium_model = None

for theta_r in np.logspace(-3, 2, 10):
    weights_0 = weight_intialization(
        behavioural_data_study1.shape[1], 
        behavioural_data_study2.shape[1],
        imging_data_study1.shape[1],
        imging_data_study2.shape[1])  # re-init each time
    res = minimize(
        objective_function,
        weights_0,
        args=(s_x1a, s_x2b, s_x1x1, s_x2x2, s_aa, s_bb, theta_r),
        method='L-BFGS-B'
    )
    if res.status !=0:
        print(res.status)
        continue


    if res.fun < best_loss:
        best_loss = res.fun
        best_theta_r = theta_r
        optimium_model = res

print(f"Best θ_r: {best_theta_r}")
print(f"Best loss: {best_loss}")

In [None]:
dimensions = get_dimensions(s_x1x1, s_x2x2, s_aa)
weights = get_weights(optimium_model.x,
            dimensions['dx1_shape'], 
            dimensions['dx_shape'], 
            dimensions['dac_shape'])

In [None]:
# Get projections (scores)
scores_x1 = behavioural_data_study1 @ weights['wx1']
scores_x2 = behavioural_data_study2 @ weights['wx2']
scores_a  = imging_data_study1 @ weights['wa']
scores_b  = imging_data_study2 @ weights['wb']


In [None]:
display(np.corrcoef(scores_x1, scores_a)[0, 1])
display(np.corrcoef(scores_x2, scores_b)[0, 1])
display(np.linalg.norm(weights['wa'] - weights['wb']))

np.float64(0.11538425261898907)

np.float64(0.07889484464506155)

np.float64(0.00014508777456870848)

## Putting it all together

In [4]:

class C3A:
    """
    C3A class.
    A class to do C3A

    Usage
    -----
    c3a = C3A(l2=0.5, theta=1)
    c3a.fit(study1, study2)
    transformed = c3a.transform(study1, study2)
    """

    def __init__(
        self,
        l2: float = 1,
        theta: float = 0,
        tol=1e-6,
        maxiter=500,
    ):
        self.l2_ = l2
        self.theta_ = theta
        self.intial_weights_ = None
        self.dims_ = []
        self.best_loss = float("inf")
        self.weights_ = None
        self.covariances_ = {}
        self.tol_ = tol
        self.maxiter_ = maxiter
        self.canonical_correlations_ = None
        self.projections_ = None

    def fit(self, *data_sets: tuple) -> None:
        """
        Method to fit the CA3 model to a given
        set of datasets

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        -------
        None
        """
        data_sets = self._normalise_input_data(*data_sets)
        self._calculate_covariance_matricies(*data_sets)
        self._get_dimensions(*data_sets)
        self._weight_intialization()
        self._optimise()

    def transform(self, *data_sets: tuple) -> list[np.ndarray]:
        """
        Methods to transform data sets into canonical
        projects.

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        --------
        projects: list[np.ndarray]
            conatins a list of the
            projections of each dataset in
            ndarry of n_components by n_samples
        """
        assert (
            self.weights_ is not None
        ), "Model must be fitted before transform can be called."
        assert len(data_sets) == len(
            self.dims_
        ), "Model fitted with different number of datasets."
        data_sets = self._normalise_input_data(*data_sets)
        self.projections_ = [
            np.stack(
                [
                    self._normalise(X_data @ wx),
                    self._normalise(Y_data @ wb),
                ],
                axis=0,
            )
            for (X_data, Y_data), (wx, wb) in zip(data_sets, self.weights_)
        ]

        self.canonical_correlations_ = [
            np.corrcoef(data_sets[0], data_sets[1])[0, 1]
            for data_sets in self.projections_
        ]
        return self.projections_

    def fit_transform(self, *data_sets) -> list[np.ndarray]:
        """
        Methods to fit a CA3 model and then transform
        the data.

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        --------
        projects: list[np.ndarray]
            conatins a list of the
            projections of each dataset in
            ndarry of n_components by n_samples.
        """
        self.fit(*data_sets)
        return self.transform(*data_sets)

    def calculate_canonical_correlations(self) -> list[float]:
        """
        Method to obtain the canonical correlations.
        Model must have been fitted and transfomed
        before.

        Parameters
        ----------
        None

        Returns
        -------
        canonical_correlations: list[float]
            list of canonical correlations
        """
        assert (
            self.canonical_correlations_ is not None
        ), "Model must be fitted and transfomed before correlations can be returned"
        return self.canonical_correlations_

    def compute_loadings(
        self, *data_sets: tuple
    ) -> list[tuple[np.ndarray, np.ndarray]]:
        """
        Computes canonical loadings for each study.

        Parameters
        ----------
        data_sets: tuple
            List of (X, Y) pairs.

        Returns
        -------
        loadings: list of tuples
            Each tuple contains (X, Y), i.e., correlations between
            original features and their respective canonical variates.
        """
        assert (
            self.projections_ is not None
        ), "Model must be fitted and transfomed before computing loadings."
        data_sets = self._normalise_input_data(*data_sets)
        return [
            (
                np.corrcoef(X_data.T, x_proj, rowvar=True)[:-1, -1],
                np.corrcoef(Y_data.T, y_proj, rowvar=True)[:-1, -1],
            )
            for (X_data, Y_data), (x_proj, y_proj) in zip(data_sets, self.projections_)
        ]
    def _normalise_input_data(self, *data_sets) -> tuple:
        """
        Normalise input data.

        Parameters
        ----------
        data_sets: tuple
            List of (X, Y) pairs.
        
        Returns
        -------
        data_set: tuple
            tuple of normalised data
        """
        return tuple((self._normalise(X), self._normalise(Y)) for X, Y in data_sets)
    
    def _weight_intialization(self) -> np.ndarray:
        """
        Method to define a set of random starting
        weights

        Parameters
        ----------
        weights: tuple(int)
            tuple of set amount
            of int values

        Returns
        -------
        np.ndarrray
            array of numpy values
        """
        init_weights = []

        for idx, _ in enumerate(self.dims_):
            s_xb = self.covariances_[f"s_X{idx+1}_Y{idx+1}"]
            # Perform SVD on the cross-covariance matrix
            try:
                U, _, Vt = np.linalg.svd(s_xb, full_matrices=False)
            except np.linalg.LinAlgError as e:
                raise RuntimeError(f"SVD failed for dataset {idx+1}: {e}")

            wx = U[:, 0]
            wb = Vt.T[:, 0]
            s_xx = self.covariances_[f"s_X{idx+1}_X{idx+1}"]
            s_bb = self.covariances_[f"s_Y{idx+1}_Y{idx+1}"]

            wx = wx / np.sqrt(wx.T @ s_xx @ wx + 1e-8)
            wb = wb / np.sqrt(wb.T @ s_bb @ wb + 1e-8)

            init_weights.extend(wx)
            init_weights.extend(wb)

        self.intial_weights_ = np.array(init_weights)

    def _calculate_covariance_matricies(self, *data_sets) -> dict:
        """
        Calculates covariance and auto covariance
        matricies

        Parameters
        ----------
        study_pairs: tuple
            a tuple or list containing two numpy arrays:
            (behavioural_data, imaging_data).
            Assumes data is (subjects x features).

        Returns
        -------
        covariance_results: dict
            dictionary of covariance and auto-covariance matrices

        """
        for idx, study_pair in enumerate(data_sets):
            X_data, Y_data = study_pair
            self._data_able_to_process(study_pair)
            study_num = idx + 1
            try:
                self.covariances_[f"s_Y{study_num}_Y{study_num}"] = (
                    self._create_covariance_matrix(Y_data, Y_data)
                )
                self.covariances_[f"s_X{study_num}_X{study_num}"] = (
                    self._create_covariance_matrix(X_data, X_data)
                )
                self.covariances_[f"s_X{study_num}_Y{study_num}"] = (
                    self._create_covariance_matrix(X_data, Y_data)
                )

            except Exception as e:
                print(f"Error calculating covariances for Study {study_num}: {e}")

    def _data_able_to_process(self, study_pair: tuple) -> bool:
        """
        Method to check that data
        is in correct format to be processed

        Parameters
        ----------
         study_pair: tuple,
             tuple of behavioural data
             and imging data


        Returns
        -------
        bool: boolean
            bool of if failed or not
        """
        assert (
            isinstance(study_pair, (tuple, list)) and len(study_pair) == 2
        ), "Given argument isn't a pair of datasets"
        assert isinstance(study_pair[0], np.ndarray) or not isinstance(
            study_pair[1], np.ndarray
        ), "Data provided ins't numpy array"
        assert (study_pair[0].shape[0] != 0) and (
            study_pair[1].shape[0] != 0
        ), "Study pairs contains not data"
        assert (
            study_pair[0].shape[0] == study_pair[1].shape[0]
        ), f"Mismatch between ({study_pair[0].shape[0]} and {study_pair[1].shape[0]})"

    def _optimise(self) -> None:
        """
        Method to minimise the
        objective function

        Parameters
        ----------
        None

        Returns
        --------
        None
        """
        model = minimize(
            self._objective_function,
            self.intial_weights_,
            options={"gtol": self.tol_, "maxiter": self.maxiter_},
            args=(self.covariances_, self.theta_, self.l2_),
        )
        self.best_loss = model.fun
        self.weights_ = self._split_weights(model.x)

    def _get_dimensions(self, *data_sets) -> None:
        """
        Method to get the dimensions
        of the data

        Parameters
        ----------
        *data_sets: tuple
            tuple of datasets

        Returns
        -------
        None
        """
        self.dims_ = [(X.shape[1], Y.shape[1]) for X, Y in data_sets]

    def _split_weights(self, weights: np.ndarray) -> list[np.ndarray]:
        """
        Splits the flat weight vector weights into individual vectors
        for each x and b dataset.

        Parameters
        ----------
        weights: np.ndarray
            flatten numpy array

        Returns
        -------
        split_weights: list[np.ndarray]
            list of weights split
            wx and wb

        """
        offset = 0
        split_weights = []
        for X_dim, Y_dim in self.dims_:
            wx = weights[offset : offset + X_dim]
            offset += X_dim
            wb = weights[offset : offset + Y_dim]
            offset += Y_dim
            split_weights.append((wx, wb))
        return split_weights

    def _objective_function(
        self, weights: np.ndarray, covariances: dict, theta: float, l2: float
    ) -> float:
        """
        Objective function of the CA3 class

        Parameters
        ----------
        weights: np.ndarray
            weights
        covariances: dict
            dict of cross/auto covariance
            matricies
        theta: float
            theta penality
        l2: float
            regularization penailty

        Returns
        -------
        total_loss: float
           total loss of the objective function
        """
        total_loss = 0
        weights_ = self._split_weights(weights)
        for idx, (wx, wb) in enumerate(weights_):
            s_xb = covariances[f"s_X{idx+1}_Y{idx+1}"]
            s_xx = covariances[f"s_X{idx+1}_X{idx+1}"]
            s_bb = covariances[f"s_Y{idx+1}_Y{idx+1}"]
            total_loss += self._cross_cov_term(wb, s_xb, wx)
            total_loss += self._regularization_term(wx, s_xx, l2)
            total_loss += self._regularization_term(wb, s_bb, l2)
            
        # Similarity penalty across imaging weights
        if theta > 0 and len(weights_) > 1:
            total_loss += sum(
                self._dissimilarity_penality(theta, w1[0], w2[0])
                for w1, w2 in combinations(weights_, 2)
            )

        return total_loss

    def _create_covariance_matrix(
        self, matrix_1: np.ndarray, matrix_2: np.ndarray
    ) -> np.ndarray:
        """
        Function to calculate cross-auto
        covariance matrix

        Parameters
        ----------
        matrix_1: np.ndarray
            A matrix tht should
            correspond to subject by
            features
        matrix_2: np.ndarray
            A matrix that should
            correspond to features by
            feautres

        Returns
        -------
        np.ndarray: array
            array of covariance matrix
        """
        return (matrix_1.T @ matrix_2) / matrix_1.shape[0]

    def _normalise(self, data: np.ndarray) -> np.ndarray:
        """
        Function to normalise data.

        Parmeteres
        ----------
        data: np.ndarray
            data to demean

        Returns
        -------
        np.ndarray: array
            demeaned data
        """
        dmean = data - data.mean(axis=0)
        std = data.std(axis=0, ddof=1)
        std = np.where(std == 0.0, 1.0, std)
        return dmean / std

    def _cross_cov_term(
        self, weight_Y: np.ndarray, cov_mat: np.ndarray, weight_X: np.ndarray
    ) -> np.ndarray:
        """
        Method to calculate the cross covarance term
        in the objective function

        Parameters
        ----------
        weight_Y: np.ndarray
            set of weights for wb
        cov_mat: np.ndarray
             covariance matrix for
             wx wb
        weight_X: np.ndarray
            set of weights for wx

        Returns
        -------
        np.ndarray: np.array
            cross covariance term
        """
        return -weight_X.T @ (cov_mat @ weight_Y)

    def _regularization_term(
        self, weight: np.ndarray, cov_mat: np.ndarray, lambda_i: float
    ) -> float:
        """
        Method to calculate the regularization term
        in the objective function

        Parameters
        ----------
        weight: np.ndarray
            set of weights
        cov_mat: np.ndarray
            auto covariance matrix
        lambda_i: float
            regularization parameter

        Returns
        -------
        float: float
            regularization term of the objective function
        """
        return 0.5 * lambda_i * (weight.T @ (cov_mat @ weight) -1)

    def _dissimilarity_penality(
        self, theta_r: float, X_weight1: np.ndarray, X_weight2: np.ndarray
    ) -> float:
        """
        Method to return dissimilarity penality

        Parameters
        -----------
        theta_r: float
           theta penality.
        img_weight1: np.ndarray
            weights of imaging data
        img_weight2: np.ndarray
            weights of second imaging
            data

        Returns
        -------
        float: float
            dissimilarity penality
        """
        return theta_r * 0.5 * np.sum((X_weight1 - X_weight2) ** 2)

    def _score(self, *data_sets: tuple) -> float:
        """
        Method used to evaluate model performance.

        Parameters
        -----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        -------
        float: float
            mean of correlation
            values across datasets

        """
        if self.weights_ is None:
            raise ValueError("Model must be fitted before scoring.")

        self.transform(*data_sets)
        correlations = self.calculate_canonical_correlations()
        return np.mean(correlations)


In [69]:
ca3 = C3A(l2=0.0)
ca3.fit(study1)
transfomed = ca3.fit_transform(study1)

In [72]:
ca3.calculate_canonical_correlations()

[np.float64(0.3239853848670129)]

In [70]:
ca3.compute_loadings(study1)

[(array([ 6.50947842e-02,  3.45557537e-01,  5.66321994e-01,  3.83312821e-01,
         -1.81847950e-01,  2.62204233e-01, -5.05192906e-02, -4.02943852e-02,
         -2.99698561e-02,  4.20505671e-02,  1.26363061e-01,  1.42736771e-01,
         -8.92349301e-02,  2.41565788e-02,  9.41206529e-02,  8.35883746e-02,
          6.84212524e-02,  5.83734116e-02, -5.86919099e-02, -1.82156739e-01,
          9.92557349e-02,  1.34787598e-01, -5.19479026e-04,  1.16005295e-01,
         -1.37646563e-01,  7.99917716e-02, -7.39660771e-02,  1.47775193e-01,
          1.09178108e-01, -5.99257009e-02, -8.81859588e-02, -1.60744524e-01,
         -9.58841613e-02, -4.96039626e-02, -3.10735979e-02,  8.29625651e-02,
          1.75327875e-01, -1.31592730e-03, -1.12143690e-02, -1.27730866e-01,
         -5.07038437e-02, -2.00497007e-01,  1.02188325e-01,  8.78384925e-03,
         -2.13156732e-02, -1.12533137e-01,  3.32505881e-02, -5.55617049e-02,
         -1.50266855e-02,  1.06955485e-01]),
  array([1.]))]

In [63]:

# Assuming `data` is a list of tuples: [(img1, beh1), (img2, beh2), ...]
# Define a custom scorer if needed, but the `score` method should suffice

search = GridSearchCA3(l2_values=[0.01, 0.1, 1.0, 10.0, 1000000], theta=0.5, verbose=True)
search.fit(study1, study2)

best_model = search.get_best_model()
print("Best L2:", search.get_best_l2())
print("Best score:", search.best_score_)

l2: 0.01, score: 0.32219472953669887
l2: 0.1, score: 0.32219472953669887
l2: 1.0, score: -0.32219472953669887
l2: 10.0, score: 0.3221947295366989
l2: 1000000, score: 0.32219472953669887
Best l2: 10.0, Best score: 0.3221947295366989
Best L2: 10.0
Best score: 0.3221947295366989


In [94]:
from scipy.linalg import pinv as pinv2
for (x, y), (x_p, y_p) in zip(ca3.data, ca3.projects_):
    x = x.squeeze()
    y = y.squeeze()
    x_loadings = np.dot(x, x_p) / np.dot(x_p, x_p)
    y_loadings = np.dot(y, y_p) / np.dot(y_p, y_p)
    x_rotations = np.dot(x, pinv2(np.dot(x_loadings.T, x )))
    print(x_loadings)
    print(y_loadings)
    #print(y.shape[0])
    #print(x_p.shape[0])


ValueError: expected matrix

In [162]:
import matplotlib.pyplot as plt
import scipy.stats 
X1_proj_ca3, Y1_proj_ca3 = transfomed['projections']['study0']
display(scipy.stats.ttest_ind(X1_proj_ca3.flatten(), X1_proj_cca.flatten() ))
plt.figure(figsize=(12, 5))


plt.subplot(1, 2, 1)
plt.scatter(X1_proj_ca3, Y1_proj_ca3, c='teal')
m, b = np.polyfit(X1_proj_ca3.flatten(), Y1_proj_ca3.flatten(), 1)
plt.plot(X1_proj_ca3, m*X1_proj_ca3 + b, color='black', linestyle='--')
plt.text(0.05, 0.95, f"r = {transfomed['correlations']['study0'][0]:.2f}", 
         transform=plt.gca().transAxes, va='top', ha='left')
plt.title("CA3 projections")
plt.xlabel("Imaging")
plt.ylabel("Behavior")

plt.subplot(1, 2, 2)
plt.scatter(X1_proj_cca, Y1_proj_cca, c='orange', label=f"r = {sklearn_corr:.2f}")
m, b = np.polyfit(X1_proj_cca.flatten(), Y1_proj_cca.flatten(), 1)
plt.plot(X1_proj_cca, m*X1_proj_cca + b, color='black', linestyle='--')
plt.text(0.05, 0.95, f"r = {sklearn_corr:.2f}", 
         transform=plt.gca().transAxes, va='top', ha='left')
plt.title("sklearn CCA projections")
plt.xlabel("Imaging")
plt.ylabel("Behavior")
plt.legend()
plt.tight_layout()
plt.show()

TypeError: list indices must be integers or slices, not str

In [None]:

class C3A_single_study:
    """
    C3A class.
    A class to do C3A

    Usage
    -----
    c3a = C3A(l2=0.5, theta=1)
    c3a.fit(study1, study2)
    transformed = c3a.transform(study1, study2)
    """

    def __init__(
        self,
        l2: float = 1,
        theta: float = 0,
        tol=1e-6,
        maxiter=500,
        normalise_weights=True,
    ):
        self.l2_ = l2
        self.theta_ = theta
        self.intial_weights_ = None
        self.dims_ = []
        self.best_loss = float("inf")
        self.weights_ = None
        self.covariances_ = {}
        self.tol_ = tol
        self.maxiter_ = maxiter
        self.normalise_weights = normalise_weights
        self.canonical_correlations_ = None
        self.projections_ = None

    def fit(self, X, Y) -> None:
        """
        Method to fit the CA3 model to a given
        set of datasets

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        -------
        None
        """
        self._calculate_covariance_matricies(X, Y)
        self._get_dimensions(X, Y)
        self._weight_intialization()
        self._optimise()

    def transform(self, X, Y: tuple) -> list[np.ndarray]:
        """
        Methods to transform data sets into canonical
        projects.

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        --------
        projects: list[np.ndarray]
            conatins a list of the
            projections of each dataset in
            ndarry of n_components by n_samples
        """
        assert (
            self.weights_ is not None
        ), "Model must be fitted before transform can be called."

        x_projections = self._normalise(self._normalise(X) @ self.weights_[0])
        y_projections = self._normalise(self._normalise(Y) @ self.weights_[1])
        self.projections_ = np.stack([x_projections, y_projections])
        self.canonical_correlations_ = np.corrcoef(x_projections, y_projections)[0, 1] 
        return self.projections_

    def fit_transform(self, X, Y) -> list[np.ndarray]:
        """
        Methods to fit a CA3 model and then transform
        the data.

        Parameters
        ----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        --------
        projects: list[np.ndarray]
            conatins a list of the
            projections of each dataset in
            ndarry of n_components by n_samples.
        """
        self.fit(X, Y)
        return self.transform(X, Y)

    def calculate_canonical_correlations(self) -> list[float]:
        """
        Method to obtain the canonical correlations.
        Model must have been fitted and transfomed
        before.

        Parameters
        ----------
        None

        Returns
        -------
        canonical_correlations: list[float]
            list of canonical correlations
        """
        assert (
            self.canonical_correlations_ is not None
        ), "Model must be fitted and transfomed before correlations can be returned"
        return self.canonical_correlations_
        
    def compute_loadings(self, X, Y) -> list[tuple[np.ndarray, np.ndarray]]:
        """
        Computes canonical loadings for each study.
        
        Parameters
        ----------
        data_sets: tuple
            List of (img_data, beh_data) pairs.
        
        Returns
        -------
        loadings: list of tuples
            Each tuple contains (img_loadings, beh_loadings), i.e., correlations between
            original features and their respective canonical variates.
        """
        assert self.projections_ is not None, "Model must be fitted and transfomed before computing loadings."
        return [
            np.corrcoef(self._normalise(X).T, self.projections_[0], rowvar=True)[:-1, -1],
            np.corrcoef(self._normalise(Y).T, self.projections_[1], rowvar=True)[:-1, -1]
            ]
    
    def _weight_intialization(self) -> np.ndarray:
        """
        Method to define a set of random starting
        weights

        Parameters
        ----------
        weights: tuple(int)
            tuple of set amount
            of int values

        Returns
        -------
        np.ndarrray
            array of numpy values
        """

        s_xb = self.covariances_[f"s_X_Y"]
        try:
            U, _, Vt = np.linalg.svd(s_xb, full_matrices=False)
        except np.linalg.LinAlgError as e:
            raise RuntimeError(f"SVD failed due to: {e}")

        wx = U[:, 0]
        wb = Vt.T[:, 0]
        s_xx = self.covariances_[f"s_X_X"]
        s_bb = self.covariances_[f"s_Y_Y"]

        wx = wx / np.sqrt(wx.T @ s_xx @ wx)
        wb = wb / np.sqrt(wb.T @ s_bb @ wb)
        self.intial_weights_ = np.concat([wx, wb])

    def _calculate_covariance_matricies(self, X_data, Y_data) -> dict:
        """
        Calculates covariance and auto covariance
        matricies

        Parameters
        ----------
        study_pairs: tuple
            a tuple or list containing two numpy arrays:
            (behavioural_data, imaging_data).
            Assumes data is (subjects x features).

        Returns
        -------
        covariance_results: dict
            dictionary of covariance and auto-covariance matrices

        """
        self._data_able_to_process(X_data, Y_data)
        X_data = self._normalise(X_data)
        Y_data = self._normalise(Y_data)

        try:
            self.covariances_["s_Y_Y"] = (
                self._create_covariance_matrix(Y_data, Y_data)
            )
            self.covariances_[f"s_X_X"] = (
                self._create_covariance_matrix(X_data, X_data)
            )
            self.covariances_[f"s_X_Y"] = (
                self._create_covariance_matrix(X_data, Y_data)
            )
        except Exception as e:
            print(f"Error calculating covariances due to: {e}")

    def _data_able_to_process(self, X_data, Y_data) -> bool:
        """
        Method to check that data
        is in correct format to be processed

        Parameters
        ----------
         study_pair: tuple,
             tuple of behavioural data
             and imging data


        Returns
        -------
        bool: boolean
            bool of if failed or not
        """
        assert isinstance(Y_data, np.ndarray) or not isinstance(
            X_data, np.ndarray
        ), "Data provided ins't numpy array"
        assert (X_data.shape[0] != 0) and (
            Y_data.shape[0] != 0
        ), "Study pairs contains not data"
        assert (
            X_data.shape[0] == Y_data.shape[0]
        ), f"Mismatch between ({X_data.shape[0]} and {Y_data.shape[0]})"

    def _optimise(self) -> None:
        """
        Method to minimise the
        objective function

        Parameters
        ----------
        None

        Returns
        --------
        None
        """
        model = minimize(
            self._objective_function,
            self.intial_weights_,
            options={"gtol": self.tol_, "maxiter": self.maxiter_},
            args=(self.covariances_, self.theta_, self.l2_),
        )
        self.best_loss = model.fun
        self.weights_ = self._split_weights(model.x)

    def _get_dimensions(self, X, Y) -> None:
        """
        Method to get the dimensions
        of the data

        Parameters
        ----------
        *data_sets: tuple
            tuple of datasets
        
        Returns
        -------
        None
        """
        self.dims_ = [X.shape[1], Y.shape[1]]

    def _split_weights(self, weights: np.ndarray) -> list[np.ndarray]:
        """
        Splits the flat weight vector weights into individual vectors
        for each x and b dataset.

        Parameters
        ----------
        weights: np.ndarray
            flatten numpy array

        Returns
        -------
        split_weights: list[np.ndarray]
            list of weights split
            wx and wb

        """
        wx = weights[0 : self.dims_[0]]
        wb = weights[self.dims_[0]:  self.dims_[1]+1]
        return [wx, wb]

    def _objective_function(
        self, weights: np.ndarray, covariances: dict, theta: float, l2: float
    ) -> float:
        """
        Objective function of the CA3 class

        Parameters
        ----------
        weights: np.ndarray
            weights
        covariances: dict
            dict of cross/auto covariance
            matricies
        theta: float
            theta penality
        l2: float
            regularization penailty

        Returns
        -------
        total_loss: float
           total loss of the objective function
        """
        total_loss = 0
        wx, wb = self._split_weights(weights)
        s_xb = covariances["s_X_Y"]
        s_xx = covariances["s_X_X"]
        s_bb = covariances["s_Y_Y"]
        total_loss += self._cross_cov_term(wb, s_xb, wx)
        total_loss += self._regularization_term(wx, s_xx, l2)
        total_loss += self._regularization_term(wb, s_bb, l2)

        ## Similarity penalty across imaging weights
        #if theta > 0 and len(weights_) > 1:
        #    total_loss += sum(
        #        self._dissimilarity_penality(theta, w1[0], w2[0])
        #        for w1, w2 in combinations(weights_, 2)
        #    )

        return total_loss

    def _create_covariance_matrix(
        self, matrix_1: np.ndarray, matrix_2: np.ndarray
    ) -> np.ndarray:
        """
        Function to calculate cross-auto
        covariance matrix

        Parameters
        ----------
        matrix_1: np.ndarray
            A matrix tht should
            correspond to subject by
            features
        matrix_2: np.ndarray
            A matrix that should
            correspond to features by
            feautres

        Returns
        -------
        np.ndarray: array
            array of covariance matrix
        """
        return (matrix_1.T @ matrix_2) / matrix_1.shape[0]

    def _normalise(self, data: np.ndarray) -> np.ndarray:
        """
        Function to normalise data.

        Parmeteres
        ----------
        data: np.ndarray
            data to demean

        Returns
        -------
        np.ndarray: array
            demeaned data
        """
        dmean = data - data.mean(axis=0)
        std = data.std(axis=0, ddof=1)
        std = np.where(std == 0.0, 1.0, std)
        return dmean / std

    def _cross_cov_term(
        self, weight_Y: np.ndarray, cov_mat: np.ndarray, weight_X: np.ndarray
    ) -> np.ndarray:
        """
        Method to calculate the cross covarance term
        in the objective function

        Parameters
        ----------
        weight_Y: np.ndarray
            set of weights for wb
        cov_mat: np.ndarray
             covariance matrix for
             wx wb
        weight_X: np.ndarray
            set of weights for wx

        Returns
        -------
        np.ndarray: np.array
            cross covariance term
        """
        return -weight_X.T @ (cov_mat @ weight_Y)

    def _regularization_term(
        self, weight: np.ndarray, cov_mat: np.ndarray, lambda_i: float
    ) -> float:
        """
        Method to calculate the regularization term
        in the objective function

        Parameters
        ----------
        weight: np.ndarray
            set of weights
        cov_mat: np.ndarray
            auto covariance matrix
        lambda_i: float
            regularization parameter

        Returns
        -------
        float: float
            regularization term of the objective function
        """
        return 0.5 * lambda_i * (weight.T @ (cov_mat @ weight) - 1)

    def _dissimilarity_penality(
        self, theta_r: float, X_weight1: np.ndarray, X_weight2: np.ndarray
    ) -> float:
        """
        Method to return dissimilarity penality

        Parameters
        -----------
        theta_r: float
           theta penality.
        img_weight1: np.ndarray
            weights of imaging data
        img_weight2: np.ndarray
            weights of second imaging
            data

        Returns
        -------
        float: float
            dissimilarity penality
        """
        return theta_r * 0.5 * np.sum((X_weight1 - X_weight2) ** 2)

    def _score(self, X, Y) -> float:
        """
        Method used to evaluate model performance.

        Parameters
        -----------
        data_sets: tuple
            a tuple of X, Y data
            from an arbituray number of
            datasets

        Returns
        -------
        float: float
            mean of correlation
            values across datasets

        """
        if self.weights_ is None:
            raise ValueError("Model must be fitted before scoring.")

        self.transform(X, Y)
        correlations = self.calculate_canonical_correlations()
        return np.mean(correlations)
