# Representation
- Multifractal analysis (1ere approche)

- Discrete Fourier Transform (DFT) $\checkmark$
- Spectrogram
- Autoregression $\checkmark$
- Shannon encoding $\checkmark$
- Wavelets (en cours)

- Local symbolic features
- SAX representation
- Approximate entropy

ML

- Autoencoder

- RNN
- LSTM



In [None]:
# %pip install import_ipynb
# %pip install  --user git+https://github.com/neurospin/pymultifracs

In [28]:
import torch
import numpy as np
import pywt
from statsmodels.tsa.ar_model import AutoReg, ar_select_order
import pymultifracs.mfa as mfa
from pymultifracs.utils import build_q_log
from sklearn.decomposition import PCA

from scipy.interpolate import interp1d

In [29]:
def reshape_to_batches(array, m):
    """
    Transform an array of shape (n, p) into a list of arrays of shape (m, p).
    """
    array = np.asarray(array)
    p = array.shape[-1]
    num_batches = p // m
    return [array[i * m:(i + 1) * m] for i in range(num_batches)]

In [1]:
class IdentityTransform:
    @staticmethod
    def transform(X, **kwargs):
        return X

class FourierTransform:
    @staticmethod
    def transform(X, new_dimension=None, **kwargs):
        """
        Return the fourier transform of size 'new_dimension'.
        """
        fourier_transform = np.fft.fft(X, n=new_dimension,axis=-1)
        modulus = np.abs(fourier_transform)
        return modulus

class LowFourierTransform:
    @staticmethod
    def transform(X, fs=250, cutoff_ratio=.3, n=None, **kwargs):
        """
        Fourier transform with a cutoff_ratio.

        fs: sampling frequency
        cutoff_ratio: frequency over which we do not consider the value
        n: new_dimension before cutoff

        Return the cutoff fourier transform.
        """
        fourier_transform = np.fft.fft(X,n=n,axis=-1)
        if n is None:
            n = X.shape[1]
        frequencies = np.fft.fftfreq(n, d=1/fs)
        mask = frequencies < cutoff_ratio
        fourier_transform = fourier_transform[:, mask]
        modulus = np.abs(fourier_transform)
        return modulus

class LowPsdTransform:
    @staticmethod
    def transform(X, fs=250, cutoff_ratio=.3, n=None, **kwargs):
        """
        Power Spectrum Density with a cutoff_ratio.

        fs: sampling frequency
        cutoff_ratio: frequency over which we do not consider the value
        n: new_dimension before cutoff

        Return the cutoff power spectrum density (the fourier transform of the autocorrelation).
        """
        fourier_transform = np.fft.fft(X, n=n, axis=-1)
        if n is None:
            n = X.shape[1]
        frequencies = np.fft.fftfreq(n, d=1/fs)
        mask = frequencies < cutoff_ratio
        fourier_transform = fourier_transform[:, mask]
        psd = fourier_transform * np.conj(fourier_transform)
        return psd.real

class WaveDecTransform:
    @staticmethod
    def transform(X, level=4, wavelet='db1', mode='symmetric', **kwargs):
        """
        Multilevel decomposition
        """
        array = np.array(X)
        coeffs = pywt.wavedec(array, wavelet, mode=mode, level=level)
        coeffs_torch = [torch.tensor(c) for c in coeffs[:1]]
        return torch.cat(coeffs_torch, dim=-1)

class DwtTransform:
    @staticmethod
    def transform(X, wavelet='db1', mode='symmetric', **kwargs):
        """
        Single level decomposition (discrete)
        """
        array = np.array(X)
        coeffs = pywt.dwt(array, wavelet, mode=mode)
        coeffs_torch = [torch.tensor(c) for c in coeffs]
        return torch.cat(coeffs_torch, dim=-1)
    
class CwtTransform:
    @staticmethod
    def transform(X,wavelet='gaus1',mode='symmetric', **kwargs):
        """
        Single level decomposition (continuous)
        """
        array = np.array(X)
        coeffs = pywt.cwt(array, wavelet, mode=mode)
        coeffs_torch = [torch.tensor(c) for c in coeffs]
        return torch.cat(coeffs_torch, dim=-1)
    
class AutoRegTransform:
    @staticmethod
    def get_ar_coefficients(X, k=3, **kwargs):
        """
        Perform an autoregression on each example, the coefficient of the autoregression are used in the representation of the signal.
        
        k: the number of coefficients computed
        """
        n, p = X.shape
        X = np.array(X)
        ar_coefficients = np.zeros((n, k))
        for i in range(n):
            model = AutoReg(X[i], lags=k).fit()
            ar_coefficients[i] = model.params[1:k+1]
        return ar_coefficients

    @staticmethod
    def transform(X, k=3, **kwargs):
        return AutoRegTransform.get_ar_coefficients(X, k)

class ShannonEncodingTransform:
    @staticmethod
    def transform(X, level=4, wavelet='db1', mode='symmetric', **kwargs):
        '''
        Inspired by https://fr.mathworks.com/help/wavelet/ug/ecg-classification-using-wavelet-features.html
        '''
        def compute_shannon_entropy(signal):
            return -np.nansum(signal**2 * np.log(signal**2))
        
        n_examples = X.shape[0]
        wp = pywt.WaveletPacket(X[0, :], wavelet="sym8", maxlevel=3)
        packet_names = [node.path for node in wp.get_level(3, "natural")]
        
        feature_matrix_wav_packet_entropy = np.full((n_examples, 8), np.nan)
        for i in range(len(X)):
            wp = pywt.WaveletPacket(X[i, :], wavelet="sym8", maxlevel=3)
            for j in range(len(packet_names)):
                new_wp = pywt.WaveletPacket(data=None, wavelet="sym8", maxlevel=3)
                new_wp[packet_names[j]] = wp[packet_names[j]].data
                reconstructed_signal = new_wp.reconstruct(update=False)
                feature_matrix_wav_packet_entropy[i, j] = compute_shannon_entropy(reconstructed_signal)
        return feature_matrix_wav_packet_entropy

class WaveletLeadersTransform:
    @staticmethod
    def transform(X, j1=2, j2=4, **kwargs):
        '''
        Inspired by https://fr.mathworks.com/help/wavelet/ug/ecg-classification-using-wavelet-features.html
        '''
        n = X.shape[0] if X.ndim > 1 else 1
        transformed_X = -np.ones((n, 2))
        for i in range(X.shape[0]):
            dwt, lwt = mfa.mf_analysis_full(
                X[i],
                scaling_ranges=[(j1, j2)],
                q=build_q_log(1, 10, 20),
                n_cumul=2,
                p_exp=np.inf,
                gamint=0.0
            )
            sf, cumul, mfs, hmin = lwt
            transformed_X[i, :] = sf.H.item(), cumul.log_cumulants[1].item()
        return transformed_X

class MultiFracsTransform:
    @staticmethod
    def transform(X, j1=2, j2=4, **kwargs):
        '''
        Use the multifractal framework.
        The representation obtained is inspired by HRVMultiScaling_ivanov1999:
        [degree of multifractality, tau(3), X[i].std()]
        where tau(3) is characterizing the scaling of the third moment Z_3(a).
        '''
        n = X.shape[0] if X.ndim > 1 else 1
        transformed_X = -np.ones((n, 3))
        for i in range(X.shape[0]):
            dwt, lwt = mfa.mf_analysis_full(
                X[i],
                scaling_ranges=[(j1, j2)],
                q=build_q_log(1, 10, 20),
                n_cumul=2,
                p_exp=np.inf,
                gamint=0.0
            )
            sf, cumul, mfs, hmin = lwt
            tau = interp1d(sf.q, sf.zeta[:,0,0], kind='linear')
            degree_of_multifractality = np.max(mfs.hq) - np.min(mfs.hq)
            transformed_X[i, :] = degree_of_multifractality, tau(3).item(), X[i].std()
        return transformed_X

class CrossCorTransform:
    @staticmethod
    def transform(X, new_dim=10, **kwargs):
        """
        Compute the cross covariance between examples and then do PCA.
        """
        crosscor = np.corrcoef(X)

        pca = PCA(n_components=new_dim)
        transformed_data = pca.fit_transform(crosscor)
        return transformed_data# , pca.components_
        
        # eigen_values, eigen_vectors = np.linalg.eigh(crosscor)
        
        # return transformed_X

class AutoCorTransform:
    @staticmethod
    def transform(X, k=4, m=10, **kwargs):
        """
        Weird improvisation, does not work
        """
        
        n = X.shape[0] if X.ndim > 1 else 1
        transformed_X = list()
        for i in range(X.shape[0]):
            batches = reshape_to_batches(X[i],m)
            
            autocor = np.corrcoef(batches)
            # print(autocor.shape)
            pca = PCA(n_components=k)
            transformed_data = pca.fit_transform(autocor)
            transformed_X.append(pca.components_.flatten())
            
        return np.array(transformed_X)


In [31]:
class TransformationRegistry:
    def __init__(self):
        self.transformations = {}

    def register(self, name, transform_class):
        self.transformations[name] = transform_class

    def get(self, name):
        transform_class = self.transformations.get(name)
        if transform_class is None:
            raise ValueError(f"Transformation {name} not recognized.")
        return transform_class


In [32]:
class DataTransform:
    def __init__(self, registry):
        self.registry = registry

    def apply_transformation(self, X, transformation_names, **kwargs):
        if not isinstance(transformation_names, list):
            transformation_names = [transformation_names]

        transformed_parts = []
        for name in transformation_names:
            transform_class = self.registry.get(name)
            transformed_part = transform_class.transform(X, **kwargs)
            transformed_parts.append(transformed_part)

        return np.concatenate(transformed_parts, axis=1)


In [33]:
if __name__ == "__main__":
    # Initialize the registry
    registry = TransformationRegistry()

    # Register transformations
    registry.register('identity', IdentityTransform)
    registry.register('fourier', FourierTransform)
    registry.register('low_fourier', LowFourierTransform)
    registry.register('low_psd', LowPsdTransform)
    registry.register('wavedec', WaveDecTransform)
    registry.register('dwt', DwtTransform)
    registry.register('autoreg', AutoRegTransform)
    registry.register('shannon_encoding', ShannonEncodingTransform)
    registry.register('wavelet_leaders', WaveletLeadersTransform)
    registry.register('multifracs', MultiFracsTransform)
    registry.register('crosscor', CrossCorTransform)
    registry.register('autocor', AutoCorTransform)


In [34]:
if __name__ == "__main__":
    # Initialize the data transformer
    data_transformer = DataTransform(registry)

    # Example input data
    X = np.random.randn(100, 150)  # Example input data
    y = np.random.randint(0, 2, 100)  # Example labels
    
    for trans_names in registry.transformations.keys():
        trans_names_str = [str(name) for name in trans_names]
        trans_name_str = '+'.join(trans_names_str) if isinstance(trans_names, list) else trans_names
        kwargs = trans_names[1] if isinstance(trans_names, list) and len(trans_names) > 1 else {}
        trans_names = trans_names[0] if isinstance(trans_names, list) else trans_names
        
        # Apply transformation
        transformed_X = data_transformer.apply_transformation(X, trans_names, **kwargs)
        
        print(f"Transformation: {trans_name_str}, Shape: {transformed_X.shape}")
        

Transformation: identity, Shape: (100, 150)
Transformation: fourier, Shape: (100, 150)
Transformation: low_fourier, Shape: (100, 77)
Transformation: low_psd, Shape: (100, 77)
Transformation: wavedec, Shape: (100, 10)
Transformation: dwt, Shape: (100, 150)
Transformation: autoreg, Shape: (100, 3)
Transformation: shannon_encoding, Shape: (100, 8)
Transformation: wavelet_leaders, Shape: (100, 2)
Transformation: crosscor, Shape: (100, 10)
Transformation: autocor, Shape: (100, 60)


In [35]:
if __name__ == "__main__":
    from sklearn.svm import SVC
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    from sklearn.preprocessing import StandardScaler

    

    # Define the classifiers to be tested
    classifiers = {
        'SVM': SVC(),
        'DecisionTree': DecisionTreeClassifier(),
        'RandomForest': RandomForestClassifier()
    }

    # Define the transformations to be tested
    transformations = [
        ['identity'],
        ['crosscor'],
        ['autocor',{'m':4,'k':4}],
        ['fourier'],
        ['wavedec'],
        ['autoreg', {'k': 3}]
    ]

    

    # Function to evaluate a classifier using cross-validation
    def evaluate_classifier_cv(classifier, X, y):
        scores = cross_val_score(classifier, X, y, cv=5)  # 5-fold cross-validation
        return np.mean(scores), np.std(scores)

    # Loop over each transformation and each classifier
    results = {}

    for trans_names in transformations:
        trans_names_str = [str(name) for name in trans_names]
        trans_name_str = '+'.join(trans_names_str) if isinstance(trans_names, list) else trans_names
        kwargs = trans_names[1] if isinstance(trans_names, list) and len(trans_names) > 1 else {}
        trans_names = trans_names[0] if isinstance(trans_names, list) else trans_names
        
        # Apply transformation
        transformed_X = data_transformer.apply_transformation(X, trans_names, **kwargs)
        
        # Standardize the data (important for some classifiers like SVM)
        scaler = StandardScaler()
        transformed_X = scaler.fit_transform(transformed_X)
        
        results[trans_name_str] = {}
        for clf_name, clf in classifiers.items():
            # Evaluate the classifier with cross-validation
            mean_accuracy, std_accuracy = evaluate_classifier_cv(clf, transformed_X, y)
            results[trans_name_str][clf_name] = (mean_accuracy, std_accuracy)
            print(f"Transformation: {trans_name_str}, Classifier: {clf_name}, Mean Accuracy: {mean_accuracy:.3f}, Std Dev: {std_accuracy:.3f}")

    # Print the results
    for trans_name, clf_results in results.items():
        for clf_name, (mean_accuracy, std_accuracy) in clf_results.items():
            print(f"Transformation: {trans_name}, Classifier: {clf_name}, Mean Accuracy: {mean_accuracy:.3f}, Std Dev: {std_accuracy:.3f}")


Transformation: identity, Classifier: SVM, Mean Accuracy: 0.470, Std Dev: 0.051
Transformation: identity, Classifier: DecisionTree, Mean Accuracy: 0.450, Std Dev: 0.100
Transformation: identity, Classifier: RandomForest, Mean Accuracy: 0.550, Std Dev: 0.045
Transformation: crosscor, Classifier: SVM, Mean Accuracy: 0.530, Std Dev: 0.121
Transformation: crosscor, Classifier: DecisionTree, Mean Accuracy: 0.540, Std Dev: 0.092
Transformation: crosscor, Classifier: RandomForest, Mean Accuracy: 0.530, Std Dev: 0.068
Transformation: autocor+{'m': 4, 'k': 4}, Classifier: SVM, Mean Accuracy: 0.580, Std Dev: 0.051
Transformation: autocor+{'m': 4, 'k': 4}, Classifier: DecisionTree, Mean Accuracy: 0.640, Std Dev: 0.058
Transformation: autocor+{'m': 4, 'k': 4}, Classifier: RandomForest, Mean Accuracy: 0.580, Std Dev: 0.040
Transformation: fourier, Classifier: SVM, Mean Accuracy: 0.440, Std Dev: 0.073
Transformation: fourier, Classifier: DecisionTree, Mean Accuracy: 0.410, Std Dev: 0.116
Transformat