In [None]:
import multirex as mrex
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import sys 
import pandas as pd
from tqdm import tqdm

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker


from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import  ConfusionMatrixDisplay
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

%matplotlib inline
waves=np.loadtxt("waves.txt")
n_points = len(waves)

In [None]:
# Función para normalizar por filas con Min-Max
def normalize_min_max_by_row(df):
    min_by_row = df.min(axis=1)
    max_by_row = df.max(axis=1)
    range_by_row = max_by_row - min_by_row
    normalized = (df.sub(min_by_row, axis=0)).div(range_by_row, axis=0)
    normalized[range_by_row == 0] = 0  
    return normalized

def plot_confusion_matrix(y_test, y_pred, labels, display_labels, title):
    """
    Función para calcular y mostrar la matriz de confusión con etiquetas descriptivas y un título personalizado.

    Parámetros:
    - y_test: Array con los valores reales.
    - y_pred: Array con las predicciones del modelo.
    - labels: Lista de las etiquetas de clase en el orden en que deben ser tratadas.
    - display_labels: Lista con los nombres descriptivos para las etiquetas de las clases para visualización.
    - title: String con el título para el gráfico de la matriz de confusión.
    """
    cm = confusion_matrix(y_test, y_pred, labels=labels, normalize='true')

    fig, ax = plt.subplots(figsize=(8, 8))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
    disp.plot(values_format=".0%", colorbar=True, ax=ax)
    disp.im_.colorbar.remove() 
    disp.im_.set_clim(0, 1)  

    # Crea manualmente la barra de colores con los límites correctos
    cbar = fig.colorbar(disp.im_, ax=ax)
    cbar.ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1))  
    # Ajustes de formato y título
    plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1)
    plt.title(title, fontsize=20)
    plt.show()

mix_ratios = np.logspace(-10,-1,10)


## load data

In [None]:
import ast
# Función para convertir string a lista
def string_to_list(string):
    return ast.literal_eval(string)


H2O_data = pd.read_csv("spec_data/H2O_data.csv")
H2O_data["label"] = H2O_data["label"].apply(string_to_list)
H2O_data["label"] = H2O_data["label"].apply(lambda x: x[2])
H2O_data.data = H2O_data.iloc[:,-n_points:]
H2O_data.params = H2O_data.iloc[:,:-n_points]

CO2_data = pd.read_csv("spec_data/CO2_data.csv")
CO2_data["label"] = CO2_data["label"].apply(string_to_list)
CO2_data["label"] = CO2_data["label"].apply(lambda x: x[2])
CO2_data.data = CO2_data.iloc[:,-n_points:]
CO2_data.params = CO2_data.iloc[:,:-n_points]


## train data

### SNR = 1

In [None]:
SNR1_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                   n_repeat=10000,
                                   SNR=1)

SNR1_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                      n_repeat=500,
                                      SNR=1)

SNR1=pd.concat([SNR1_CO2,SNR1_H2O],ignore_index=True)

### SNR = 3

In [None]:
SNR3_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                   n_repeat=10000,
                                   SNR=3)

SNR3_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                      n_repeat=500,
                                      SNR=3)
SNR3=pd.concat([SNR3_CO2,SNR3_H2O],ignore_index=True)

### SNR = 6

In [None]:
SNR6_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                      n_repeat=10000,
                                      SNR=6)

SNR6_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                     n_repeat=500,
                                     SNR=6)

SNR6=pd.concat([SNR6_CO2,SNR6_H2O],ignore_index=True)

### SNR = 10

In [None]:
SNR10_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                        n_repeat=10000,
                                        SNR=10)
SNR10_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                        n_repeat=500,
                                        SNR=10)

SNR10=pd.concat([SNR10_CO2,SNR10_H2O],ignore_index=True)

### SNR = 20

In [None]:
SNR20_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                        n_repeat=10000,
                                        SNR=20)

SNR20_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                        n_repeat=500,
                                        SNR=20)

SNR20=pd.concat([SNR20_CO2,SNR20_H2O],ignore_index=True)

### SNR = NaN

In [None]:
SNRnan_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                        n_repeat=10000,
                                        SNR=1e4)

SNRnan_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                        n_repeat=1000,
                                        SNR=1e4)

SNRnan=pd.concat([SNRnan_CO2,SNRnan_H2O],ignore_index=True)

## RF

In [None]:
SNRall = pd.concat([SNR1,SNR3,SNR6,SNR10,SNR20,SNRnan],ignore_index=True)

print(SNRall["label"].value_counts())

SNRall_train, SNRall_test = train_test_split(SNRall, test_size=0.2,)
SNRall_train.iloc[:,-n_points:]=normalize_min_max_by_row(SNRall_train.iloc[:,-n_points:])
SNRall_X_train = SNRall_train.iloc[:, -n_points:]
SNRall_y_train = SNRall_train["label"].astype(int)


In [None]:
SNRall_rf = RandomForestClassifier(
    n_estimators=400,
    max_depth=200,
    criterion="entropy",
    min_samples_leaf=3,
    n_jobs=-1
                            )

SNRall_rf.fit(SNRall_X_train, SNRall_y_train) 


In [None]:
import joblib

joblib.dump(SNRall_rf, "H2O_rf.joblib")

### Test

In [None]:
SNR = 1

SNR_val_CO2 = mrex.generate_df_SNR_noise(df=CO2_data,
                                        n_repeat=10000,
                                        SNR=SNR)
SNR_val_H2O = mrex.generate_df_SNR_noise(df=H2O_data,
                                         n_repeat=1000,
                                         SNR=SNR)
SNR_val= pd.concat([SNR_val_CO2,SNR_val_H2O],ignore_index=True)

SNR_val_test = SNR_val


SNR_val_test.iloc[:,-n_points:]=normalize_min_max_by_row(SNR_val_test.iloc[:,-n_points:])

SNR_val_X_test = SNR_val_test.iloc[:, -n_points:]
SNR_val_y_test = SNR_val_test["label"].astype(int)


In [None]:
SNR_val_predictions = SNRall_rf.predict_proba(SNR_val_X_test)[:, 1]
threshold = 0.36
SNR_val_y_pred = (SNR_val_predictions > threshold).astype(int)

SNR_val_test["pred"] = SNR_val_y_pred
SNR_val_test["score"] = SNR_val_predictions

print(confusion_matrix(SNR_val_y_test, SNR_val_y_pred))
print(classification_report(SNR_val_y_test, SNR_val_y_pred))

plot_confusion_matrix(
    SNR_val_y_test, SNR_val_y_pred,
    labels=[1,0], display_labels=["$H_2O$", "No $H_2O$"],
    title=f"Confusion Matrix for $H_2O$ Detection\n(SNR={SNR})"
)

In [None]:
SNR_val_by_atmH2O=[[] for i in range(10)]

for i, j in enumerate(range(-10,-0)):
    SNR_val_by_atmH2O[i]=SNR_val_test[SNR_val_test["atm H2O"]==j]
    co2=int(len(SNR_val_test[SNR_val_test["label"]==0])/10)
    SNR_val_by_atmH2O[i]=pd.concat([SNR_val_by_atmH2O[i],
                                      SNR_val_test[SNR_val_test["label"]==0].sample(n=co2)])
    
SNR_val_f1 = np.zeros(len(mix_ratios))
SNR_val_recall = np.zeros(len(mix_ratios))
SNR_val_precision = np.zeros(len(mix_ratios))
SNR_val_score = np.zeros(len(mix_ratios))

for j in range(len(mix_ratios)):
                
                SNR_val_f1[j] = f1_score(
                    SNR_val_by_atmH2O[j]["label"].astype(int),
                    SNR_val_by_atmH2O[j]["pred"].astype(int)
                    )
                
                SNR_val_recall[j] = recall_score(
                    SNR_val_by_atmH2O[j]["label"].astype(int),
                    SNR_val_by_atmH2O[j]["pred"].astype(int)
                    )
                
                SNR_val_precision[j]=precision_score(
                    SNR_val_by_atmH2O[j]["label"].astype(int),
                    SNR_val_by_atmH2O[j]["pred"].astype(int)
                    )

## plot scores
plt.figure()
plt.plot(mix_ratios,SNR_val_f1,label="f1")
plt.plot(mix_ratios,SNR_val_recall,label="recall")
plt.plot(mix_ratios,SNR_val_precision,label="precision")

plt.xlabel("mix ratio")
plt.xscale("log")
plt.ylabel("score")

plt.legend()
plt.title(f"Scores for $H_2O$ Detection\n(SNR={SNR})")