En este notebook se pretende crear un modelo de reduccion de la dimensionalidad basado en técnicas clasicas, más concretamente una PCA. Se pretende seguir la misma estructura que un autoencoder, es decir, generar un método de codificación de la señal para comprimir la informacion, lo cual se realizará mediante un analisis de componetes principales (PCA), y, posteriormente, analizar la capacidad de reconstrucción del modelo. \
En primer lugar importamos el conjunto de bibliotecas necesarias para realizar los cálculos.

In [None]:
###Importamos las librerias necesarias:
##Importamos Pandas para trabajar con DataFrames
import pandas as pd

##Importamos Numpy para incluir mas funciones matematicas
import numpy as np

##Importamos MatPlotLib para realizar representaciones
import matplotlib.pyplot as plt
from matplotlib import gridspec #Es un paquete de matplotlib que permite realizar figuras de varias subfiguras

##Importamos la funcion display de la libreria IPython.display, que permite mostrar contenido de manera mas enriquecida. Tambien importamos Image para poder mostrar gráficos
from IPython.display import display

##Importamos varias funciones de las librerias sklearn
from sklearn.model_selection import train_test_split #Es una función que se utiliza para dividir un conjunto de datos en conjuntos de entrenamiento y prueba de forma aleatoria.
from sklearn.decomposition import PCA #Importamos la clase PCA para poder realizare analisis de componetes principales a los datos

#Importamos la librería joblib para poder exportar modelos de sklearn
import joblib

##Immportamos el generador de números aleatorios
import random

Definimos una serie de funciones útiles a lo largo del codigo, entre las que se encuentra: función de normalizacion min_max, funcion para convertir un datafraem en un tensor y funcion de representacion de pulsos.

In [None]:
##Función para normalizar las señales. Proporciona dos opciones: 1 para normalizar a [0,1] y 2 para normalizar respecto al valor máximo
#Por defecto normaliza a escala [0,1]
def normalizacion(df, tipo = 1):
    #Normalización [0,1]
    if tipo == 1:
        maximos_fila = df.max(axis = 1).to_numpy() #Extraemos el valor máximo de las filas
        minimos_fila = df.min(axis = 1).to_numpy() #Extraemos el valor mínimo de las filas
        df = (df - minimos_fila[:, None])/(maximos_fila[:, None] - minimos_fila[:, None]) #Normalizamos los valores del Dataframe
        return df
    
    #Normalización respecto al máximo
    if tipo == 2:
        maximos_fila = df.max(axis = 1).to_numpy() #Extraemos el valor máximos de las filas
        df = df/maximos_fila[:, None] #Dividimos cada fila por su máximo
        return df
    
##Funcion para cambiar el formato del input del autoencoder y que sea compatible con papas convolucionales 1D
def convert_to_tensor(df):
    #Convertimos el dataframe a un tensor
    tensor = tf.convert_to_tensor(df, dtype = tf.float32)
    #Añadimos la dimension de canal 
    tensor = tf.expand_dims(tensor, axis = 2)
    return tensor

##Definimos una funcion para representar de manera comoda dos señales diferentes aleatorias en un mismo grafico, que provengan del mismo conjunto de datos
def plot_pulsos(data):
    #Generar dos números enteros aleatorios entre 1 y el número total de muestras
    n_random = random.randint(1, data.shape[0])
    m_random = random.randint(1, data.shape[0])
    
    #Representacion de las señales en un plot
    plt.figure(figsize = (10, 5))
    plt.plot(data.iloc[n_random], label = "Señal 1", color = "blue")
    plt.plot(data.iloc[m_random], label = "Señal 2", color = "red")
    plt.title('Señal')
    plt.xticks([])
    plt.ylabel('Valor')
    plt.show()

Cargamos el conjunto de datos con el que vamos a trabajar y realizamos el preprocesamiento de los datos necerario antes de aplicar el analisis de componentes principales.

In [None]:
##Cargamos el conjunto de datos con el que vamos a trabajar, que en este caso son un conjunto de señales del core de AGATA
url = "./Data/Core_signal.dat"
core_signals = pd.read_csv(url, sep = "\t", header = "infer", dtype = np.float64, chunksize = None)

##Normalizamos las señales de core en [0,1] para realizar el entrenamiento del autoencoder
core_norm = normalizacion(core_signals.copy())

##Dividimos las señales del core de AGATA en un conjunto de entrenamiento, un conjunto de validación y un conjunto test, con una proporcion del 80% para el conjunto de entrenamiento
#y un 10% para el conjunto de validación y otro 10% para el conjuto test
X_train, X_valid = train_test_split(core_norm, train_size = 0.80)
X_valid, X_test = train_test_split(X_valid, train_size = 0.50)

Pasamos a definir la PCA para el conjunto de entrenamiento, lo que nos va a definir una transformacion de reduccion de la dimensión que podremos aplicar al conjunto de validación y test para comprobar la valided de la transformación.

In [None]:
##Instanciamos la clase asociada con la PCA, definiendo los parametros que nos interesen. Al especificar el argumento n_components = Noen indicamos que se conserven todas las componetes principales
#Al indicar el argumento svd_solver = "covariance_eigh" indicamos que el metodo de obtencion sea mediante la descomposicion en valores singulares de la matriz de convarianzas.
pca = PCA(n_components = 10, svd_solver = "covariance_eigh")

#Una vez instanciada la clase ajustamos la transformacion a partir del conjunto de entrenamiento
pca.fit(X_train)

#Guardamos en un fichero el modelo de PCA ajustado para los datos mediante la biblioteca joblib
joblib.dump(pca, "/home/jupyter-manuel/Digiopt/Models/PCA_model.joblib")
#Cargamos el modelo de PCA guardado
pca = joblib.load("/home/jupyter-manuel/Digiopt/Models/PCA_model.joblib")


#Una vez ajustada la transformación la aplicamos al conjunto de entrenamiento, de validacion y test, con los pesos obtenidos para el conjunto de entrenamiento
X_train_reduced = pca.transform(X_train)
X_valid_reduced = pca.transform(X_valid)
X_test_reduced = pca.transform(X_test)

In [None]:
##Evaluamos el error de reconstruccion dado por la PCA, manteniendo 10 componentes principales.
X_valid_reconstructed = pca.inverse_transform(X_valid_reduced)

#Calculamos el error de reconstruccion promedio dado por la PCA mediante el indicador MSE
rmse = np.sqrt(np.mean((X_valid - X_valid_reconstructed) ** 2))
print(f"Error de reconstrucción (RMSE): {rmse}")

In [None]:
#Convertimos los resultados dados por la PCA para cada conjunto de datos a un DataFrame, para operar con ellos más adelante
X_train_reduced_df = pd.DataFrame(X_train_reduced)
X_valid_reduced_df = pd.DataFrame(X_valid_reduced)
X_test_reduced_df = pd.DataFrame(X_test_reduced)

#Estraemos el porcentaje de varianza explicada por cada una de las componetes principales
explained_variance = pca.explained_variance_ratio_
#Extraemos el autovalor correspondientre a cada componete principal
eagenvalues = pca.singular_values_
#Exrtaemos las coordenadas de cada componete principal en el espacio de caracteristicas original, y lo convertimos en un DataFrame
components = pd.DataFrame(pca.components_)

print(explained_variance)
print(eagenvalues)
display(components)

# Generamos un gráfico ScreePlot para visualizar el porcentaje de varianza absorvida por cada una de las componetes principales
plt.figure(figsize=(10, 7))
plt.plot(range(1, len(explained_variance) + 1), explained_variance, marker='o', linestyle='--')
plt.title('Scree Plot')
plt.xlabel('Componentes Principales')
plt.ylabel('Proporción de Varianza Explicada')
plt.xticks(range(1, len(explained_variance) + 1))
plt.grid()
plt.show()