In [6]:
import numpy as np
from scipy.linalg import sqrtm, logm

class Data:
    def __init__(self, data):
        self.data = data
    
    def get_data(self):
        return self.data


class CovarianceMatrix:
    def __init__(self, data):
        self.data = data
        self.cov_matrix = None

    def calculate_covariance(self):
        self.cov_matrix = np.cov(self.data, rowvar=False)
        return self.cov_matrix



class Transformation:
    def __init__(self):
        pass

    def project_to_tangent(self, cov_matrix_current, cov_matrix_next):
        sqrt_B = sqrtm(cov_matrix_next)
        sqrt_inv_B = np.linalg.inv(sqrt_B)
        transformed_A = np.dot(np.dot(sqrt_inv_B, cov_matrix_current), sqrt_inv_B)
        log_transformed_A = logm(transformed_A)
        return log_transformed_A

    def transport_to_tangent(self, delta_matrix, tangent_plane_start, tangent_plane_end):
        inv_B = np.linalg.inv(tangent_plane_end)
        A_invB = np.dot(tangent_plane_start, inv_B)
        E = sqrtm(A_invB)
        transported_Delta = np.dot(np.dot(E, delta_matrix), E.T)
        return transported_Delta


class CovariancePipeline:
    def __init__(self, data, p):
        self.data = Data(data)
        self.cov_matrices = []
        self.trajectory = []
        self.transformation = Transformation()
        self.p = p

    def compute_covariances(self):
        data = self.data.get_data()
        for i in range(len(data)):
            cov_matrix = CovarianceMatrix(data[i]).calculate_covariance()
            self.cov_matrices.append(cov_matrix)

    def compute_trajectory(self):
        for i in range(len(self.cov_matrices) - 1):
            cov_matrix_current = self.cov_matrices[i]
            cov_matrix_next = self.cov_matrices[i + 1]
            
            # Projection de la matrice actuelle vers le plan tangent de la matrice suivante
            projected_matrix = self.transformation.project_to_tangent(cov_matrix_current, cov_matrix_next)
            
            # Si c'est la première projection, elle est ajoutée directement
            if not self.trajectory:
                self.trajectory.append(projected_matrix)
            else:
                # Transport de toutes les projections actuelles vers le plan tangent de la matrice suivante
                for j in range(len(self.trajectory)):
                    self.trajectory[j] = self.transformation.transport_to_tangent(self.trajectory[j], cov_matrix_current, cov_matrix_next)  + projected_matrix
                
                # Limiter la taille de la trajectoire à p
                if len(self.trajectory) > self.p:
                    self.trajectory.pop(0)

        return self.trajectory

# Exemple d'utilisation
data = [
    np.random.rand(100, 3),
    np.random.rand(100, 3),
    np.random.rand(100, 3)
]

pipeline = CovariancePipeline(data, p=2)
pipeline.compute_covariances()
trajectory = pipeline.compute_trajectory()

print(trajectory)






[array([[-0.20999732, -0.215462  , -0.18987865],
       [-0.215462  ,  0.19653449, -0.13926258],
       [-0.18987865, -0.13926258, -0.05596022]])]


In [44]:
import numpy as np
from scipy.linalg import sqrtm, logm

class Data:
    def __init__(self, data):
        self.data = data
    
    def get_data(self):
        return self.data

class CovarianceMatrix:
    def __init__(self, data):
        self.data = data
        self.cov_matrix = None

    def calculate_covariance(self):
        self.cov_matrix = np.cov(self.data, rowvar=True)
        return self.cov_matrix

class Transformation:
    def __init__(self):
        pass

    def project_to_tangent(self, cov_matrix_current, cov_matrix_next):
        sqrt_B = sqrtm(cov_matrix_next)
        sqrt_inv_B = np.linalg.inv(sqrt_B)
        transformed_A = np.dot(np.dot(sqrt_inv_B, cov_matrix_current), sqrt_inv_B)
        log_transformed_A = logm(transformed_A)
        return log_transformed_A

    def transport_to_tangent(self, delta_matrix, tangent_plane_start, tangent_plane_end):
        inv_B = np.linalg.inv(tangent_plane_end)
        A_invB = np.dot(tangent_plane_start, inv_B)
        E = sqrtm(A_invB)
        transported_Delta = np.dot(np.dot(E, delta_matrix), E.T)
        return transported_Delta

class CovariancePipeline:
    def __init__(self, data, oublie):
        self.data = Data(data)
        self.cov_matrices = []
        self.trajectory = []
        self.transformation = Transformation()
        self.oublie = oublie

    def compute_covariances(self):
        data = self.data.get_data()
        for i in range(len(data)):
            cov_matrix = CovarianceMatrix(data[i]).calculate_covariance()
            self.cov_matrices.append(cov_matrix)

    def compute_trajectory(self):
        for i in range(len(self.cov_matrices) - 1):
            cov_matrix_current = self.cov_matrices[i]
            cov_matrix_next = self.cov_matrices[i + 1]
            
            # Projection de la matrice actuelle vers le plan tangent de la matrice suivante
            projected_matrix = self.transformation.project_to_tangent(cov_matrix_current, cov_matrix_next)
            self.trajectory.append(projected_matrix)
            # Si c'est la première projection, elle est ajoutée directement
            if not self.trajectory:
                self.trajectory.append(projected_matrix)
            else:
                # Nouvelle trajectoire à ajouter
                new_trajectory = projected_matrix
                
                # Transport de toutes les projections actuelles vers le plan tangent de la matrice suivante
                for j in range(len(self.trajectory)):
                    self.trajectory[j] = self.transformation.transport_to_tangent(self.trajectory[j], cov_matrix_current, cov_matrix_next) + new_trajectory
                    # Additionner la matrice transportée à la matrice projetée
                    new_trajectory += self.trajectory[j]
                
                # Limiter la taille de la trajectoire à p
                if len(self.trajectory) > self.oublie:
                    self.trajectory.pop(0)

        return self.trajectory

# Exemple d'utilisation
data = [
    np.random.rand(3, 100),
    np.random.rand(3, 100)
    
]

pipeline = CovariancePipeline(matrices_covariance_initiales, oublie=5)
pipeline.compute_covariances()
trajectory = pipeline.compute_trajectory()

print(trajectory)


UFuncTypeError: Cannot cast ufunc 'add' output from dtype('complex128') to dtype('float64') with casting rule 'same_kind'

In [36]:
import mne
class GenerateurDonnees:
    def __init__(self, channels=3, sfreq=256.0, duration=10, change=5.0, A1=2.0e-6, A2=0.0e-6, sd=2.0e-6):
        self.channels = channels
        self.sfreq = sfreq
        self.duration = duration
        self.change = change
        self.frequencies = 10.0 + 2.0 * np.random.rand(channels)
        self.phases = 2.0 * np.pi * np.random.rand(channels)
        self.A1 = A1
        self.A2 = A2
        self.sd = sd
        self.info = mne.create_info([f'EEG{n:03}' for n in range(1, channels + 1)],
                                    ch_types=['eeg'] * channels, sfreq=sfreq)

    def generer_donnees(self):
        samples = int(self.sfreq * self.duration)
        t = np.linspace(0, self.duration, samples, endpoint=False)
        A = self.A1 + (t > self.change).astype(float) * (self.A2 - self.A1)
        data = A * np.cos(np.outer(self.frequencies, t) + np.outer(self.phases, np.ones(samples)))
        data += np.random.normal(0, self.sd, size=(self.channels, samples))
        # Vérification des valeurs infinies ou NaN
        if not np.isfinite(data).all():
            raise ValueError("Les données générées contiennent des valeurs infinies ou NaN")
        raw = mne.io.RawArray(data, self.info)
        return raw

    def exporter_donnees(self, filename='dummy.edf'):
        raw = self.generer_donnees()
        mne.export.export_raw(filename, raw, fmt='edf', overwrite=True)


class CalculCovariance:
    def __init__(self, p_points, stride):
        self.p_points = p_points
        self.stride = stride
    
    def calculer_matrices_covariance(self, donnees):
        k, n = donnees.shape
        matrices_covariance = []
        
        for start_index in range(0, n - self.p_points + 1, self.stride):
            end_index = start_index + self.p_points
            segment = donnees[:, start_index:end_index]
            covariance_matrix = np.cov(segment) + np.eye(k) * 1e-6  # Régularisation ajoutée
            matrices_covariance.append(covariance_matrix)
        
        return matrices_covariance

In [37]:
# Générer des données initiales
generateur = GenerateurDonnees()
raw_data_initiale = generateur.generer_donnees()
donnees_initiales = raw_data_initiale.get_data()

# Calculer les matrices de covariance initiales
calcul_cov = CalculCovariance(p_points=256, stride=128)
matrices_covariance_initiales = calcul_cov.calculer_matrices_covariance(donnees_initiales)

Creating RawArray with float64 data, n_channels=3, n_times=2560
    Range : 0 ... 2559 =      0.000 ...     9.996 secs
Ready.
