In [1]:
!pip install PyWavelets
!pip install pyts
!pip install aeon




[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import time
import pywt
import numpy as np
import pandas as pd

from pyts.image import MarkovTransitionField
from pyts.image import GramianAngularField
from pyts.image import RecurrencePlot

from aeon.transformations.collection.convolution_based import Rocket, MiniRocket
from aeon.datasets.tsc_datasets import multivariate
from sklearn.linear_model import RidgeClassifierCV

import config 
from config import DATASETS_FOLDER, logger
from utils import load_dataset


#### Configurações

In [15]:
RESULTS_FILENAME = f'results_final.csv'

reps = ['RP', 'MTF', 'GASF', 'GADF', 'FIRTS', 'CWT']
operations = ["sum", "subtraction", "dot_product", "element_wise"]


#### Funções utilitárias

In [4]:
def znorm(x):
    """
    função para normalizar as séries na mesma escala
    a série ficará com uma média próxima de 0 e desvio-padrão próximo de 1
    """

    x_znorm = (x - np.mean(x)) / np.std(x)
    return x_znorm


def transform_series(series, representation):
    """
    função que transforma uma série de entrada em uma imagem em 2D.
    transformações que serão exploradas: CWT, MTF, GADF, GASF, RP e FIRTS
    referência para entender um pouco melhor: https://pyts.readthedocs.io/en/stable/modules/image.html
    """

    series = np.array(znorm(series))
    if representation == "CWT":
        coeffs, freqs = pywt.cwt(series, scales=np.arange(1, len(series) + 1), wavelet='morl') # morl
        im_final = coeffs
    elif representation == "MTF":
        series = series.reshape(1, len(series))
        mtf = MarkovTransitionField(strategy='normal') #n_bins=4, strategy='uniform'
        X_mtf = mtf.fit_transform(series)
        im_final = X_mtf[0]
    elif representation == "GADF":
        series = series.reshape(1, len(series))
        gaf = GramianAngularField(method='difference')
        X_gaf = gaf.fit_transform(series)
        im_final = X_gaf[0]
    elif representation == "GASF":
        series = series.reshape(1, len(series))
        gaf = GramianAngularField(method='summation')
        X_gaf = gaf.fit_transform(series)
        im_final = X_gaf[0]
    elif representation == "RP":
        series = series.reshape(1, len(series))
        rp = RecurrencePlot(threshold='distance')
        X_rp = rp.fit_transform(series)
        im_final = X_rp[0]
    elif representation == "FIRTS":
        series = series.reshape(1, len(series))
        mtf = MarkovTransitionField(n_bins=4, strategy='uniform')
        X_mtf = mtf.fit_transform(series)
        gaf = GramianAngularField(method='difference')
        X_gaf = gaf.fit_transform(series)
        rp = RecurrencePlot(threshold='distance')
        X_rp = rp.fit_transform(series)
        im_final = (X_mtf[0] + X_gaf[0] + X_rp[0])
    return im_final


def dimensions_fusion(img_dataset, operation):
    """
    operation: sum, subtraction, dot_product, element_wise
    """

    new_data = []
    for dataset in img_dataset:
        imgs = dataset.copy()
        img_final = imgs.pop()
        for img in imgs:
            if operation == 'sum':
                img_final += img
            elif operation == 'subtraction':
                img_final -= img
            elif operation == 'dot_product':
                img_final = np.dot(img_final, img)
            elif operation == 'element_wise':
                img_final = np.multiply(img_final, img)

        flatten_img = img_final.flatten()
        new_data.append(flatten_img)

    return np.array(new_data)


#### Gerando resultados com apenas o classficador Ridge sem nenhuma transformação ou convolução

In [17]:
try:
    df_results = pd.read_csv(f"{config.RESULTS_FOLDER}/{RESULTS_FILENAME}")
except FileNotFoundError:
    df_results = pd.DataFrame(columns=[
        "dataset",
        "representation",
        "operation",
        "accuracy",
        "convolution_algorithm",
        "classification_algorithm",
    ])
    

for dataset_name in multivariate:
    if df_results[
        (df_results["dataset"] == dataset_name)
        & (df_results["representation"].isnull())
        & (df_results["operation"].isnull())
    ].shape[0] == 1:
        logger.info(f"Dataset {dataset_name} já processado.")
        continue
    
    try:
        dataset = load_dataset(dataset_name, DATASETS_FOLDER)
        X_train = dataset["X_train"]
        y_train = dataset["y_train"]
        X_test = dataset["X_test"]
        y_test = dataset["y_test"]

        try:
            result = np.sum(X_train[0], axis=0)
            X_train_transformed = np.array([np.sum(serie, axis=0) for serie in X_train])
            X_test_transformed = np.array([np.sum(serie, axis=0) for serie in X_test])

            classifier = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
            classifier.fit(X_train_transformed, y_train)

            accuracy = classifier.score(X_test_transformed, y_test)

            new_result_line = {
                "dataset": dataset_name,
                "representation": None,
                "operation": None,
                "accuracy": accuracy,
                "convolution_algorithm": None,
                "classification_algorithm": "Ridge",
            }
            df_results.loc[len(df_results)] = new_result_line
            df_results.to_csv(f"{config.RESULTS_FOLDER}/{RESULTS_FILENAME}", index=False)
            
            logger.info("Processamento finalizado com sucesso.")
        except Exception as e:
            logger.error(f"Problema com o dataset {dataset_name}: {e}")
    except Exception as e:
        logger.error(f"Problema ao carregar dataset {dataset_name}: {e}")


#### Gerando resultados com o classficador Ridge, transformações e convoluções

In [20]:
datasets = multivariate
for dataset in config.SKIP_DATASETS:
    datasets.remove(dataset)

for dataset_name in datasets:
    try:
        if df_results[
            df_results["dataset"] == dataset_name
        ].shape[0] == len(reps) * len(operations) * 3: # Teste sem convolução, com Rocket e com MiniRocket
            logger.info(f"Dataset {dataset_name} já processado.")
            continue
        
        dataset = load_dataset(dataset_name, config.DATASETS_FOLDER)
        X_train = dataset["X_train"]
        y_train = dataset["y_train"]
        X_test = dataset["X_test"]
        y_test = dataset["y_test"]

        for representation in reps:
            if df_results[
                (df_results["dataset"] == dataset_name)
                & (df_results["representation"] == representation)
            ].shape[0] == len(operations) * 3: # Teste sem convolução, com Rocket e com MiniRocket 
                logger.info(f"Dataset {dataset_name} com representação {representation} já processado.")
                continue
            
            logger.info(f"Iniciando o processo de transformação das dimensões na representação {representation}")

            transformed_train_series = []
            for exemple in X_train:
                transformed_train_series.append(
                    [transform_series(series, representation) for series in exemple]
                )
            transformed_test_series = []
            for exemple in X_test:
                transformed_test_series.append(
                    [transform_series(series, representation) for series in exemple]
                )

            logger.info("Finalizado processo de transformação das dimensões com sucesso")

            for operation in operations:
                if df_results[
                    (df_results["dataset"] == dataset_name)
                    & (df_results["representation"] == representation)
                    & (df_results["operation"] == operation)
                ].shape[0] == 3: # Teste sem convolução, com Rocket e com MiniRocket 
                    logger.info(f"Dataset {dataset_name}, representação {representation} e operação {operation} todos as variações já processadas.")
                    continue
                
                logger.info(f"Iniciando processo de fusão das dimensões na operação {operation}")
                X_train_transformed = dimensions_fusion(transformed_train_series, operation)
                X_test_transformed = dimensions_fusion(transformed_test_series, operation)
                logger.info("Finalizado processo de fusão")

                try:
                    if df_results[
                        (df_results["dataset"] == dataset_name)
                        & (df_results["representation"] == representation)
                        & (df_results["operation"] == operation)
                        & (df_results["convolution_algorithm"].isnull())
                    ].shape[0] == 0:
                        logger.info("Iniciando processo de treinamento apenas com o classificador Ridge")
                        classifier = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
                        classifier.fit(X_train_transformed, y_train)

                        accuracy = classifier.score(X_test_transformed, y_test)

                        new_result_line = {
                            "dataset": dataset_name,
                            "representation": representation,
                            "operation": operation,
                            "accuracy": accuracy,
                            "convolution_algorithm": None,
                            "classification_algorithm": "Ridge",
                        }
                        df_results.loc[len(df_results)] = new_result_line
                        df_results.to_csv(f"{config.RESULTS_FOLDER}/{RESULTS_FILENAME}", index=False)
                    else:
                        logger.info(f"Dataset {dataset_name} com representação {representation}, operação {operation} e sem convolução já processado.")
                except Exception as e:
                    logger.error(f"Problema com o dataset {dataset_name} com o classificador Ridge: {e}")

                try:
                    if df_results[
                        (df_results["dataset"] == dataset_name)
                        & (df_results["representation"] == representation)
                        & (df_results["operation"] == operation)
                        & (df_results["convolution_algorithm"] == "Rocket")
                    ].shape[0] == 0:
                        logger.info("Iniciando processo de treinamento com o classificador Ridge e convolução Rocket")

                        algorithm = Rocket(n_kernels=10000, n_jobs=-1, random_state=6)
                        algorithm.fit(X_train_transformed)

                        X_train_transformed = algorithm.transform(X_train_transformed)
                        X_test_transformed = algorithm.transform(X_test_transformed)

                        classifier = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
                        classifier.fit(X_train_transformed, y_train)

                        accuracy = classifier.score(X_test_transformed, y_test)

                        new_result_line = {
                            "dataset": dataset_name,
                            "representation": representation,
                            "operation": operation,
                            "accuracy": accuracy,
                            "convolution_algorithm": "Rocket",
                            "classification_algorithm": "Ridge",
                        }
                        df_results.loc[len(df_results)] = new_result_line
                        df_results.to_csv(f"{config.RESULTS_FOLDER}/{RESULTS_FILENAME}", index=False)
                    else:
                        logger.info(f"Dataset {dataset_name} com representação {representation}, operação {operation} e com convolução Rocket já processado.")
                except Exception as e:
                    logger.error(f"Problema com o dataset {dataset_name} usando Rocket: {e}")

                try:
                    if df_results[
                        (df_results["dataset"] == dataset_name)
                        & (df_results["representation"] == representation)
                        & (df_results["operation"] == operation)
                        & (df_results["convolution_algorithm"] == "MiniRocket")
                    ].shape[0] == 0:
                        logger.info("Iniciando processo de treinamento com o classificador Ridge e convolução MiniRocket")

                        algorithm = MiniRocket(n_kernels=10000, n_jobs=-1, random_state=6)
                        algorithm.fit(X_train_transformed)

                        X_train_transformed = algorithm.transform(X_train_transformed)
                        X_test_transformed = algorithm.transform(X_test_transformed)

                        classifier = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
                        classifier.fit(X_train_transformed, y_train)

                        accuracy = classifier.score(X_test_transformed, y_test)

                        new_result_line = {
                            "dataset": dataset_name,
                            "representation": representation,
                            "operation": operation,
                            "accuracy": accuracy,
                            "convolution_algorithm": "MiniRocket",
                            "classification_algorithm": "Ridge",
                        }
                        df_results.loc[len(df_results)] = new_result_line
                        df_results.to_csv(f"{config.RESULTS_FOLDER}/{RESULTS_FILENAME}", index=False)
                    else:
                        logger.info(f"Dataset {dataset_name} com representação {representation}, operação {operation} e com convolução MiniRocket já processado.")
                except Exception as e:
                    logger.error(f"Problema com o dataset {dataset_name} usando MiniRocket: {e}")

        logger.info(f"Finalizado o processamento do dataset {dataset_name}.")
    except Exception as e:
        logger.error(f"Problema ao carregar dataset {dataset_name}: {e}")
    
logger.info("Finalizado o processamento de todos os datasets.")


KeyboardInterrupt: 