In [2]:
from sys import stdout

import numpy as np
from numpy import *
from numpy.linalg import eig, pinv
from ica_lib import *

In [None]:
def jadeR(mixed_signal_matrix, num_components=None, verbose=True):
    """
    Parameters:

        mixed_signal_matrix -- an nxT data matrix (n sensors, T samples). May be a numpy array or
             matrix.

        num_components -- output matrix B has size mxn so that only m sources are
             extracted.  This is done by restricting the operation of jadeR
             to the m first principal components. Defaults to None, in which
             case m=n.

        verbose -- print info on progress. Default is True.

    Returns:

        An m*n matrix B (NumPy matrix type), such that Y=B*X are separated
        sources extracted from the n*T data matrix X. If m is omitted, B is a
        square n*n matrix (as many sources as sensors). The rows of B are
        ordered such that the columns of pinv(B) are in order of decreasing
        norm; this has the effect that the `most energetically significant`
        components appear first in the rows of Y=B*X.
    """

    # GB: we do some checking of the input arguments and copy data to new
    # variables to avoid messing with the original input. We also require double
    # precision (float64) and a numpy matrix type for preprocessed_data.

    # Original code had: X, origtype, m, n, T


    preprocessed_data, input_data_type, num_components, num_samples = check_input(mixed_signal_matrix, num_components, verbose)

    # whitening & PCA
    principal_components, sorted_eigenvalues = perform_PCA_and_whitening(preprocessed_data, num_components, num_samples, verbose)

    
    # Scaling
    # Calculate the scaling factors for the principal components.
    # The scaling factor for each principal component is the inverse of the square root of its corresponding eigenvalue.
    # This normalization ensures that each principal component has unit variance.
    scaling_factors = np.sqrt(sorted_eigenvalues[:num_components])  # Calculate square roots of top eigenvalues
    whitening_matrix = np.diag(1. / scaling_factors) * principal_components.T  # Create whitening matrix by inverting the scaling factors

    # Sphering (Whitening)
    # Apply the whitening transformation to the preprocessed data.
    # This step transforms the data such that the resulting components are uncorrelated and each has unit variance.
    # The transformation is achieved by multiplying the whitening matrix with the preprocessed data.
    sphered_data = whitening_matrix * preprocessed_data  # Transform the data to a whitened space

    # Clean up by deleting variables that are no longer needed to free up memory
    del sorted_eigenvalues, principal_components, scaling_factors

    if verbose:
        print("jade -> Estimating cumulant matrices")

    # Initialize the storage for cumulant matrices
    cumulant_matrices_storage, num_cumulant_matrices = initialize_cumulant_matrices_storage(num_components)
    
    # Compute and store cumulant matrices
    for component_index in range(num_components):
        cumulant_matrix = compute_cumulant_matrix(preprocessed_data.T, num_samples, component_index, num_cumulant_matrices)
        # Store the computed cumulant matrix in the appropriate location
        storage_start_index = component_index * num_components
        storage_end_index = storage_start_index + num_components
        cumulant_matrices_storage[:, storage_start_index:storage_end_index] = cumulant_matrix


    rotation_matrix = joint_diagonalization(cumulant_matrices_storage, num_components, num_cumulant_matrices, num_samples)

    separating_matrix = rotation_matrix.T * whitening_matrix

    # Apply the sorting and sign fixing
    if verbose:
        print("jade -> Sorting the components")
    separating_matrix = sort_separating_matrix(separating_matrix)

    if verbose:
        print("jade -> Fixing the signs")
    separating_matrix = fix_matrix_signs(separating_matrix)

    return separating_matrix.astype(input_data_type)

In [None]:
class JADE:
    def __init__(self, num_components=4):
        self.num_components = num_components
        self.unmixing_matrix = None
        self.ica_jade_loadings = None
        self.ica_jade_corr = None
        self.ica_jade_ids = None

    def fit(self, mixed_signal_matrix):
        """
        Fit the JADE model to the data.

        Parameters:
        mixed_signal_matrix (numpy.ndarray): The mixed signal data matrix.

        Returns:
        numpy.ndarray: The unmixing matrix after applying JADE.
        """
        mixed_signal_matrix = np.array(mixed_signal_matrix)
        unmixing_matrix = jadeR(mixed_signal_matrix, m=self.num_components)

        # Adjust the sign of each row for better interpretability
        for i in range(unmixing_matrix.shape[0]):
            if np.abs(np.max(unmixing_matrix[i, :])) < np.abs(np.min(unmixing_matrix[i, :])):
                unmixing_matrix[i, :] *= -1

        self.unmixing_matrix = unmixing_matrix
        return unmixing_matrix

    def transform(self, mixed_signal_matrix):
        """
        Transform the data using the learned JADE model.

        Parameters:
        mixed_signal_matrix (numpy.ndarray): The mixed signal data matrix.

        Returns:
        numpy.ndarray: The separated signals.
        """
        if self.unmixing_matrix is None:
            raise ValueError("Model has not been fit yet. Call 'fit' with training data.")

        return np.dot(self.unmixing_matrix, mixed_signal_matrix.T).T

    def correlate_loadings(self, df, corrcols, icacols):
        """
        Find the correlation between loadings and a set of columns.

        Parameters:
        df (pandas.DataFrame): The DataFrame containing data.
        corrcols (list): List of columns to correlate.
        icacols (list): List of ICA columns.

        Updates:
        self.ica_jade_corr: DataFrame of correlations.
        self.ica_jade_ids: Identifiers for the correlated loadings.
        """
        if self.unmixing_matrix is None:
            raise ValueError("Model has not been fit yet. Call 'fit' with training data.")

        corrdf = df.corr().drop(icacols, axis=1).drop(corrcols, axis=0)
        ica_jade_ids = []
        for i in corrdf.loc['ICA-JADE'].index:
            tmp = corrdf.loc[('ICA-JADE', i)]
            max_corr = np.max(tmp)
            match = tmp.values == max_corr
            matched_col = corrcols[np.where(match)[0][0]]
            ica_jade_ids.append(f"{matched_col} (r={np.round(max_corr, 1)})")

        self.ica_jade_corr = corrdf
        self.ica_jade_ids = ica_jade_ids