Este notebook pretende analisar a preservação estrutural de series temporais após redução de dimensionalidade. Para isso serão selecionados datasets com alta dimensionalidade da biblioteca AEON e aplicados variações do algoritimo do PAA (Piecewise Aggregate Approximation) para redução de dimensionalidade aplicando diferentes taxas perceptuais de redução.

Para medir essa preservação da estrutura iremos calcular uma matriz de distâncias de cada série temporal antes e depois da redução de dimensionalidade utilizando a distância euclidiana. Assim poderemos comparar se após a redução de dimensionalidade os K vizinhos mais próximos de cada série temporal permanecem os mesmos.

A partir da quantidade de vizinhos preservados após a redução de dimensionalidade, será calculada uma métrica de preservação estrutural definida como um valor entre 0 e 1, onde 1 indica que todos os K vizinhos mais próximos permaneceram os mesmos e 0 indica que nenhum vizinho próximo permaneceu o mesmo.


In [1]:
# Importa algumas bibliotecas necessárias
import numpy as np
import pandas as pd
import os

from aeon.datasets import load_classification
from aeon.distances import euclidean_distance


In [35]:
# Começamos definindo a lista de datasets que serão utilizados para os experimentos
# Estes são datasets de alta dimensionalidade da biblioteca AEON
high_dim_datasets = [
  'ACSF1',
  'CinCECGTorso',
  'EOGHorizontalSignal',
  'EOGVerticalSignal',
  'EthanolLevel',
  'HandOutlines',
  'Haptics',
  'HouseTwenty',
  'InlineSkate',
  'Mallat',
  'MixedShapesRegularTrain',
  'MixedShapesSmallTrain',
  'Phoneme',
  'PigAirwayPressure',
  'PigArtPressure',
  'PigCVP',
  'Rock',
  'SemgHandGenderCh2',
  'SemgHandMovementCh2',
  'SemgHandSubjectCh2',
  'StarLightCurves',
]

In [36]:
# Para facilitar a comparação de séries em diferentes escalas, 
# podemos aplicar uma função de normalização nas séries.
# Assim todas as séries ficarão com média 0 e desvio padrão 1.
def znorm(x):
  x_znorm = (x - np.mean(x)) / np.std(x)
  return x_znorm

In [4]:
# Define as funções de agregação que podem ser usadas para reduzir a dimensionalidade no PAA
aggregations = {
    'average': lambda x: np.mean(x),
    'median': lambda x: np.median(x),
    'max': lambda x: np.max(x),
    'min': lambda x: np.min(x),
    'sum': lambda x: np.sum(x),
    'variance': lambda x: np.var(x),
    'std': lambda x: np.std(x),
    'iqr': lambda x: np.subtract(*np.percentile(x, [75, 25])),
    'first': lambda x: x[0],
    'central': lambda x: x[len(x)//2],
    'last': lambda x: x[-1],
    'max-min': lambda x: np.max(x) - np.min(x),
    'avg-max': lambda x: np.abs((np.mean(x) - np.max(x))),
    'avg-min': lambda x: np.abs((np.mean(x) - np.min(x))),
    'random': lambda x: np.random.choice(x)
}

In [5]:
# Implementa o algoritmo PAA (Piecewise Aggregate Approximation)
# Recebe uma série temporal 's', o tamanho desejado 'w' e a função de agregação 'agg'
def PAA(s, w, agg='average'):
    if agg not in aggregations:
        raise ValueError(f"Função de agregação '{agg}' é inválida ou não suportada.")

    n = len(s)
    s = np.array(s)

    # Aqui criamos n valores uniformemente espaçados entre 0 e w (exclusivo).
    # Por exemplo: n=6 e w=2 será [0,0,0,1,1,1]
    idx = np.floor(np.linspace(0, w, n, endpoint=False)).astype(int)

    # Aqui iteramos sobre o tamanho desejado 'w' e aplicamos uma máscara para selecionar os pontos da série que pertencem a cada segmento.
    # Por exemplo: n=6 e w=2, o idx resultante será: 
    #              [True, True, True, False, False, False] para w = 0
    #              Assim podemos selecionar os pontos da série que pertencem a cada segmento.
    res = [aggregations[agg](s[idx == i]) for i in range(w)]
    # Normalizamos a série reduzida utilizando Z-Norm
    res = znorm(res)

    return np.array(res)

In [6]:
# Define as taxas de redução de dimensionalidade a serem testadas
reduction_rates = [
    0.9,
    0.8,
    0.7,
    0.6,
    0.5,
    0.4,
    0.3,
    0.2,
    0.1
]

In [30]:
# Importa a função para calcular distâncias de forma eficiente
from scipy.spatial.distance import pdist, squareform

# Função para calcular a matriz de distância entre todas as séries temporais em um conjunto de dados X
def get_distance_matrix(X):
    # 1. Cria uma cópia de X para evitar modificar o original
    X_cp = np.array(X)

    # 2. Remove dimensões unitárias para garantir que X seja 2D (N x M)
    # Por se tratar de séries temporais univariadas, o formato original é 
    # (N x 1 x M) e sera convertido para (N x M)
    X_2d = X_cp.squeeze()

    # 3. pdist calcula as N*(N-1)/2 distâncias únicas
    condensed_dist = pdist(X_2d, metric='euclidean')

    # 4. squareform monta a matriz N x N simétrica completa
    distance_matrix = squareform(condensed_dist)
    
    return distance_matrix

In [22]:
# Função para obter os K vizinhos mais proximos de uma serie (s) por meio de uma matriz de distâncias
def get_k_nearest_neighbors(dist_matrix, point_idx, k):    
    # 1. Pega a linha de distâncias para o nosso ponto
    distances_from_point = dist_matrix[point_idx]
    
    # 2. Obtém os índices ordenados pela distância
    #    Ex: sorted_indices[0] será o próprio point_idx (dist 0)
    sorted_indices = np.argsort(distances_from_point)
    
    # 3. Pega os K vizinhos mais próximos.
    #    Pulamos o primeiro índice (posição 0), pois é o próprio ponto.
    #    Pegamos da posição 1 (o vizinho mais próximo) até k+1.
    k_nearest_indices = sorted_indices[1 : k + 1]
    
    # 4. Pega as distâncias correspondentes a esses vizinhos
    k_nearest_distances = distances_from_point[k_nearest_indices]
    
    return k_nearest_indices, k_nearest_distances

In [42]:
# Escolhe um dataset de alta dimensionalidade para testar
d = 'HouseTwenty'

# Carrega o dataset completo (train + test)
X, _ = load_classification(d, split=None)

# Exibe informações sobre o shape do dataset carregado
print(f"Dataset '{d}' carregado com {X.shape[0]} séries temporais, cada uma com dimensão de {X.shape[2]}.")

Dataset 'HouseTwenty' carregado com 159 séries temporais, cada uma com dimensão de 2000.


In [43]:
# Normaliza as séries com Z-Norm
X_norm = np.array([[znorm(s[0])] for s in X]) # Para cada série s em X, aplica znorm em s[0] (a série em si)

# Calcula a matriz de distância entre todas as séries normalizadas
dist_matrix = get_distance_matrix(X_norm)

# Exibe uma prévia da matriz de distância calculada com apenas os primeiros 5x5 elementos
print(f"Matriz de Distância (Início):")
print(dist_matrix[:5, :5])

Matriz de Distância (Início):
[[ 0.         56.13328025 54.34043454 54.15500871 67.07087115]
 [56.13328025  0.         61.12841113 62.47950359 63.62144155]
 [54.34043454 61.12841113  0.         46.69631666 66.82907255]
 [54.15500871 62.47950359 46.69631666  0.         67.17988739]
 [67.07087115 63.62144155 66.82907255 67.17988739  0.        ]]


In [44]:
# Testa a função de obtenção dos K vizinhos mais próximos
s_idx = 5  # Índice da série que queremos analisar
k = 3 # Número de vizinhos mais próximos a serem encontrados

# Obtém os K vizinhos mais próximos
nearest_indices, nearest_distances = get_k_nearest_neighbors(dist_matrix, s_idx, k)
print(f'Séries mais próximas da série {s_idx}: {nearest_indices} com distâncias {nearest_distances}')

Séries mais próximas da série 5: [139 107   3] com distâncias [44.33162092 46.05375307 47.36015159]


Na sequencia iremos reduzir a dimensionalidade das séries temporais utilizando o PAA e calcular novamente a matriz de distâncias e os K vizinhos mais próximos para comparar com os resultados obtidos anteriormente com a matriz de distâncias original.

In [47]:
# Carrega o dataset novamente para aplicar a redução de dimensionalidade
X_2, _ = load_classification(d, split=None)
# Obtem o tamnho (dimensão) das séries temporais
original_length = len(X_2[0][0])

agg = 'median' # Definie a função de agregação
reduction_rate = 0.5 # Define a taxa de redução %
w = int(original_length * (1 - reduction_rate)) # Calcula o tamnaho w da seria apos reduzir pela taxa definida

# Reduz a dimensionalidade das séries utilizando PAA com os parâmetros definidos
X_reduced = np.array([[PAA(s[0], w, agg=agg)] for s in X_2])

# Calcula a nova matriz de distâncias com as séries reduzidas
dist_matrix_reduced = get_distance_matrix(X_reduced)

# Exibe a nova matriz de distâncias primeiro 5x5 elementos
print(f"Matriz de Distância Reduzida (Início):")
print(dist_matrix_reduced[:5, :5])

# Exibe as k=3 series mais próximas de uma series qualquer do dataset reduzido
reduced_s_idx = 5  # Índice da série que queremos analisar

reduced_nearest_indices, reduced_nearest_distances = get_k_nearest_neighbors(dist_matrix_reduced, reduced_s_idx, k)
print(f'Séries mais próximas da série {reduced_s_idx}: {reduced_nearest_indices} com distâncias {reduced_nearest_distances}')

Matriz de Distância Reduzida (Início):
[[ 0.         39.33083987 37.74750816 38.00517818 47.60069307]
 [39.33083987  0.         43.17980888 44.15346848 45.00745588]
 [37.74750816 43.17980888  0.         32.04276567 47.45108226]
 [38.00517818 44.15346848 32.04276567  0.         47.59783525]
 [47.60069307 45.00745588 47.45108226 47.59783525  0.        ]]
Séries mais próximas da série 5: [139 107   3] com distâncias [29.95126277 32.07611833 33.22822385]


In [46]:
# Agora definimos uma função que calcula a métrica de semelhança da serie reduzida
# Essa métrica levara em conta a porcentagem de K vizinhos que permaneceram os mesmos apos a redução da dimensionalidade

def calculate_k_similarity(original_neighbors, reduced_neighbors):
    # Converte os arrays de índices em conjuntos para facilitar a comparação
    set_original = set(original_neighbors)
    set_reduced = set(reduced_neighbors)
    
    # Calcula a interseção dos dois conjuntos para encontrar os vizinhos comuns
    common_neighbors = set_original.intersection(set_reduced)
    
    # Calcula a similaridade como a proporção de vizinhos comuns em relação a K
    similarity = len(common_neighbors) / len(original_neighbors)
    
    return similarity

In [49]:
# Usamos a função para calcular a similaridade dos K vizinhos para as series testadas
similarity = calculate_k_similarity(nearest_indices, reduced_nearest_indices)

print(f'Similaridade dos K vizinhos entre a série original e a reduzida: {similarity:.2f}')

Similaridade dos K vizinhos entre a série original e a reduzida: 1.00
