In [17]:
from aeon.datasets import (
    load_anomaly_detection,
    load_classification,
    load_forecasting,
    load_regression,
)
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Ridge, RidgeCV, RidgeClassifierCV, RidgeClassifier
import pywt
from pyts.image import MarkovTransitionField
from pyts.image import GramianAngularField
from pyts.image import RecurrencePlot
from aeon.transformations.detrend import ConditionalDeseasonalizer

# from all_functions import *

def znorm(x):
  x_znorm = (x - np.mean(x)) / np.std(x)
  return x_znorm


def normalize_series_znorm(X):
    rep = []
    # X_train_series = X.flatten()

    # transform = ConditionalDeseasonalizer(sp=12)
    # transform.fit(X_train_series)
    # X_train_deseasonal = transform.transform(X_train_series)
    # X_train_flat_reshaped = X_train_deseasonal.reshape(X.shape)

    X_train_flat_reshaped = X
    for x in X_train_flat_reshaped:
      # x_normalized = znorm(x)
      x_normalized = x
      rep.append(transform_series2(x_normalized, "STFT", "bior2.2", 2))
    new = np.array(rep)
    return new.reshape(new.shape[0], -1)

def flatten_series(X):
    return X.reshape(X.shape[0], -1)
  
def flatten_labels(y):
    return np.ravel(y)

def transform_series2(series, representation, wavelet, level):
  # series = np.array(znorm(series))
  if representation == "CWT":
    coeffs, freqs = pywt.cwt(series, scales=np.arange(1, len(series) + 1), wavelet=wavelet) # morl
    im_final = coeffs
  elif representation == "DWT":
    coeffs = pywt.wavedec(series, wavelet=wavelet, level=level)
    im_final = np.concatenate(coeffs, axis=0) 
  elif representation == "SWT":
    coeffs_swt = pywt.swt(series, wavelet, level=level)
    im_final = np.concatenate([coeff[0] for coeff in coeffs_swt] + [coeff[1] for coeff in coeffs_swt], axis=0)

  elif representation == "WPT":
    wp = pywt.WaveletPacket(data=series, wavelet=wavelet, maxlevel=4, mode='symmetric')

    # Extrair os coeficientes em diferentes níveis
    nodes = wp.get_level(4, order='freq')  # Pegando o 4º nível de decomposição
    coeffs_wpt = np.array([n.data for n in nodes])

    # Concatenar os coeficientes para visualização
    im_final = np.concatenate(coeffs_wpt, axis=0)
  elif representation == "STFT":
    from scipy.signal import stft
    f, t, Zxx = stft(series,window='hann', nperseg=64)

    # Obter a magnitude dos coeficientes
    coeffs_stft = np.abs(Zxx)

    # Concatenar os coeficientes para criar im_final
    im_final = coeffs_stft
    
  elif representation == "MTF":
    series = series.reshape(1, len(series))
    mtf = MarkovTransitionField(strategy='normal') #n_bins=4, strategy='uniform'
    X_mtf = mtf.fit_transform(series)
    im_final = X_mtf[0]
  elif representation == "GADF":
    series = series.reshape(1, len(series))
    gaf = GramianAngularField(method='difference')
    X_gaf = gaf.fit_transform(series)
    im_final = X_gaf[0]
  elif representation == "GASF":
    series = series.reshape(1, len(series))
    gaf = GramianAngularField(method='summation')
    X_gaf = gaf.fit_transform(series)
    im_final = X_gaf[0]
  elif representation == "RP":
    series = series.reshape(1, len(series))
    rp = RecurrencePlot(threshold='distance')
    X_rp = rp.fit_transform(series)
    im_final = X_rp[0]
  elif representation == "FIRTS":
    series = series.reshape(1, len(series))
    mtf = MarkovTransitionField(n_bins=4, strategy='uniform')
    X_mtf = mtf.fit_transform(series)
    gaf = GramianAngularField(method='difference')
    X_gaf = gaf.fit_transform(series)
    rp = RecurrencePlot(threshold='distance')
    X_rp = rp.fit_transform(series)
    im_final = (X_mtf[0] + X_gaf[0] + X_rp[0]) # FIRTS é fusão entre MTF, GADF e RP (vejam o artigo que passei para vocês)
  return im_final


In [18]:
datasets_ucr = [
    'Adiac',
    'ArrowHead',
    'BeetleFly',
    'HouseTwenty',
    'Computers',
    'FaceAll',
    'FaceFour',
    'FacesUCR',
    'ShapesAll',
    'Ham',
    'HandOutlines',
    'InlineSkate',
    'Lightning2',
    'Mallat',
    'Meat',
    'MoteStrain',
    'Symbols',
    'Lightning7',
    'MedicalImages',
    'Wine',
    'WordSynonyms',
    'Worms',
    'Yoga',
    'Chinatown',
    'Crop',
    'EthanolLevel',
    'GestureMidAirD3',
    'Rock'
]

rocket_acc = [
    0.7834,
    0.8143,
    0.9000,
    0.9639,
    0.7612,
    0.9465,
    0.9773,
    0.9614,
    0.9068,
    0.7257,
    0.9424,
    0.4569,
    0.7590,
    0.9559,
    0.9483,
    0.9146,
    0.9743,
    0.8233,
    0.7995,
    0.8130,
    0.7534,
    0.7403,
    0.9104,
    0.9825,
    0.7513,
    0.5828,
    0.4146,
    0.9000
]
from scipy import stats
def z_test_accuracy(acc1, acc2, n1, n2, alpha=0.05):
    """
    Realiza um teste z para comparar as acurácias de dois classificadores.

    Parâmetros:
    acc1 (float): Acurácia do classificador 1 (entre 0 e 1).
    acc2 (float): Acurácia do classificador 2 (entre 0 e 1).
    n1 (int): Total de previsões do classificador 1.
    n2 (int): Total de previsões do classificador 2.
    alpha (float): Nível de significância para o teste (default: 0.05).

    Retorna:
    tuple: (valor do teste z, p-valor, str: mensagem de significância)
    """
    # Proporções
    p1 = acc1
    p2 = acc2

    # Variâncias
    var1 = p1 * (1 - p1) / n1
    var2 = p2 * (1 - p2) / n2

    # Cálculo do z
    z = (p1 - p2) / np.sqrt(var1 + var2)

    # Cálculo do p-valor (bilateral)
    p_value = 2 * (1 - stats.norm.cdf(abs(z)))

    # Mensagem de significância
    if p_value < alpha:
        significance = "significativa"
    else:
        significance = "não_significativa"

    return z, p_value, significance


In [19]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
accuracies = []

sigs = []
for index, d in enumerate(datasets_ucr):
    X, y, meta = load_classification(d, return_metadata=True)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    X_train_flattened = flatten_series(X_train)
    X_test_flattened = flatten_series(X_test)

    X_train_normalized = normalize_series_znorm(X_train_flattened)
    X_test_normalized = normalize_series_znorm(X_test_flattened)
    y_train_flattened = flatten_labels(y_train)
    y_test_flattened = flatten_labels(y_test)

    clf = RandomForestClassifier(n_estimators=200, random_state=42)
    # clf = LogisticRegression(penalty='elasticnet', solver='saga', l1_ratio=0.3, max_iter=100)

    clf.fit(X_train_normalized, y_train)

    y_pred = clf.predict(X_test_normalized)
    accuracy = accuracy_score(y_test, y_pred)
    accuracy = round(accuracy, 4)
    accuracies.append(accuracy)

    z, p_value, significance = z_test_accuracy(accuracy, rocket_acc[index], len(y_test), len(y_test))
    sigs.append(significance)

  freqs, time, Zxx = _spectral_helper(x, x, fs, window, nperseg, noverlap,
  freqs, time, Zxx = _spectral_helper(x, x, fs, window, nperseg, noverlap,


In [20]:
df = pd.DataFrame({
    'Classificador': accuracies,
    'Rocket': rocket_acc,
    'Significativo': sigs,
})
df

Unnamed: 0,Classificador,Rocket,Significativo
0,0.7319,0.7834,não_significativa
1,0.9375,0.8143,significativa
2,0.8333,0.9,não_significativa
3,0.9792,0.9639,não_significativa
4,0.84,0.7612,não_significativa
5,0.9585,0.9465,não_significativa
6,0.9118,0.9773,não_significativa
7,0.9644,0.9614,não_significativa
8,0.7194,0.9068,significativa
9,0.6462,0.7257,não_significativa


In [21]:
def contar_classificador_melhor(df):
    contagem = 0
    
    for index, row in df.iterrows():
        if row['Significativo'] == 'não_significativa':
            contagem += 1
        elif row['Significativo'] == 'significativa':
            if row['Classificador'] > row['Rocket']:
                contagem += 1
    
    return contagem

contagem_resultado = contar_classificador_melhor(df)
print(f"Nosso modelo foi igual/melhor que o Rocket {contagem_resultado}/{len(accuracies)}.")


Nosso modelo foi igual/melhor que o Rocket 24/28.
