# Import

In [None]:
# Definição dos caminhos principais (ajuste conforme necessário)
import os
ROOT_DIR = os.getcwd()
DATASET_DIR = os.path.join(ROOT_DIR, 'datasets')
HISTORY_DIR = os.path.join(ROOT_DIR, 'history')
MODELS_DIR = os.path.join(ROOT_DIR, 'models')
RESULTS_DIR = os.path.join(ROOT_DIR, 'results')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import re
from time import sleep
from datetime import datetime
import pandas as pd
import requests
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from packaging import version
from IPython import display
import math
from sklearn.preprocessing import StandardScaler
import pickle
from sklearn.model_selection import train_test_split
from sklearn import metrics
import tensorflow as tf
from tensorflow.keras import layers, losses, Model, Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, GRU, Conv2D, MaxPooling2D, AvgPool2D, Flatten
sns.set(style="whitegrid")
%matplotlib inline
# Dicionário de grupos funcionais e SMARTS
func_grp_smarts = {
    'alkane':'[CX4;H0,H1,H2,H4]',
    'methyl':'[CH3]',
    'alkene':'[CX3]=[CX3]',
    'alkyne':'[CX2]#C',
    'alcohols':'[#6][OX2H]',
    'amines':'[NX3;H2,H1;!$(NC=O)]',
    'nitriles':'[NX1]#[CX2]',
    'aromatics':'[$([cX3](:*):*),$([cX2+](:*):*)]',
    'alkyl halides':'[#6][F,Cl,Br,I]',
    'esters':'[#6][CX3](=O)[OX2H0][#6]',
    'ketones':'[#6][CX3](=O)[#6]',
    'aldehydes':'[CX3H1](=O)[#6]',
    'carboxylic acids':'[CX3](=O)[OX2H1]',
    'ether': '[OD2]([#6])[#6]',
    'acyl halides':'[CX3](=[OX1])[F,Cl,Br,I]',
    'amides':'[NX3][CX3](=[OX1])[#6]',
    'nitro':'[$([NX3](=O)=O),$([NX3+](=O)[O-])][!#8]'
}
column_names = list(func_grp_smarts.keys())

In [None]:
df_enrich = pd.read_csv(os.path.join(DATASET_DIR, 'df_enrich.csv'))
dataset_y = df_enrich.copy()
dataset_y.index = dataset_y['CAS']
len(dataset_y.CAS.unique())

8241

In [None]:
# df_spectra_all.to_parquet(os.path.join(DATASET_DIR, 'df_spectra_all_mixture_interpolate.parquet'), index=False)

In [None]:
# df_spectra_all = pd.read_csv(os.path.join(DATASET_DIR, 'df_spectra_all_mixture_interpolate.csv'))
df_spectra_all = pd.read_parquet(os.path.join(DATASET_DIR, 'df_spectra_all_mixture_interpolate.parquet'))
mean_cols = [x for x in df_spectra_all.columns if 'mean' in x]
min_cols = [x for x in df_spectra_all.columns if 'min' in x]
max_cols = [x for x in df_spectra_all.columns if 'max' in x]
all_cols = mean_cols + min_cols + max_cols
df_spectra_all.shape

(1030, 24636)

# Functions

In [None]:
def turn_spectra_into_image(data, ratio = None):
  matrix_result = np.zeros((100,1030))

  for i,val in enumerate((data.values//0.01).astype(int)):

    matrix_result[val, i] = 1

  if ratio is not None:
    matrix_result = matrix_result[:, 0:1030:ratio]

  return matrix_result

def get_dataset(df_spectra_all, dataset_y, agg_func):

  if agg_func == 'mean':
    dataset_x = df_spectra_all[mean_cols].copy()
  elif agg_func == 'min':
    dataset_x = df_spectra_all[min_cols].copy()

  elif agg_func == 'max':
    dataset_x = df_spectra_all[max_cols].copy()

  # dataset_x = df_spectra_all.copy()
  dataset_x = dataset_x.T
  dataset_x.columns = ['bin_' + str(x) for x in dataset_x.columns]
  dataset_x.reset_index(inplace=True)
  dataset_x.index = dataset_x['index'].apply(lambda x: x.split('_')[0])

  dataset_y = dataset_y[dataset_y['yunits'] == 'ABSORBANCE']

  dataset_final = pd.merge(dataset_y, dataset_x, left_index = True, right_index = True, how='inner')

  return dataset_final

def find_best_epoch(history):
    """
    Finds the epoch with the lowest validation loss.

    Args:
        history: Training history object from Keras model.fit().

    Returns:
        A tuple containing the best epoch number and its corresponding validation loss.
        Returns None if history object is invalid or empty.
    """
    if not history or 'val_loss' not in history.history:
        return None

    val_losses = history.history['val_loss']
    best_epoch = np.argmin(val_losses)  # Index of the minimum validation loss
    best_val_loss = val_losses[best_epoch]

    return best_epoch, best_val_loss

In [None]:
def compute_model_analysis(agg_func, data_prep, current_date, batch_size=380, epochs=100, callbacks=False, save_history=True, save_figure=False):
    dataset_final = get_dataset(df_spectra_all, dataset_y, agg_func)
    X = dataset_final[[col for col in dataset_final.columns if 'bin' in col]].apply(pd.to_numeric, errors='coerce').fillna(0)
    Y = dataset_final[column_names].apply(lambda x: x.astype(int))
    X_train, X_test_temp, Y_train, Y_test_temp = train_test_split(X, Y, test_size=0.20, random_state=42, stratify=Y[column_names].sum(axis=1))
    X_validation, X_test, Y_validation, Y_test = train_test_split(X_test_temp, Y_test_temp, test_size=0.30, random_state=42, stratify=Y_test_temp[column_names].sum(axis=1))
    ratio = 10
    X_train_image = np.array([turn_spectra_into_image(X_train.loc[i], ratio=ratio) for i in X_train.index])
    X_validation_image = np.array([turn_spectra_into_image(X_validation.loc[i], ratio=ratio) for i in X_validation.index])
    X_test_image = np.array([turn_spectra_into_image(X_test.loc[i], ratio=ratio) for i in X_test.index])
    X_train_image = X_train_image[..., np.newaxis]
    X_validation_image = X_validation_image[..., np.newaxis]
    X_test_image = X_test_image[..., np.newaxis]
    model = Sequential([
        layers.Input(shape=(X_train_image.shape[1], X_train_image.shape[2], 1)),
        layers.Conv2D(64, (4, 4), activation='relu'),
        layers.MaxPooling2D((4, 4)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(Y_train.shape[1], activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', 'binary_accuracy', 'precision', 'recall'])
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='min', start_from_epoch=25, restore_best_weights=True)
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, min_lr=1e-8, verbose=0, mode='min')
    callbacks_list = [early_stopping, reduce_lr] if callbacks else []
    history = model.fit(X_train_image, Y_train, validation_data=(X_validation_image, Y_validation), callbacks=callbacks_list, epochs=epochs, verbose=1, shuffle=True, batch_size=batch_size)
    y_pred = (model.predict(X_validation_image) > 0.5).astype(int)
    compose_name = f'conv2d_{agg_func}_{data_prep}_{current_date}_call_{callbacks}'
    model.save(os.path.join(MODELS_DIR, f'{compose_name}.keras'))
    if save_history:
        with open(os.path.join(HISTORY_DIR, f'{compose_name}_history.pkl'), 'wb') as f:
            pickle.dump(history.history, f)
    best_epoch, best_val_loss = find_best_epoch(history)
    if save_figure:
        plt.figure(figsize=(16, 10))
        plt.suptitle(f'Métricas Treinamento - Época Escolhida {best_epoch} - {best_val_loss:.3f} Validação Loss Function')
        plt.subplot(3, 1, 1)
        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.axvline(x=best_epoch, color='red', linestyle='--')
        plt.ylabel('Acurácia')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: '{:.3f}'.format(x)))
        plt.subplot(3, 1, 2)
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.axvline(x=best_epoch, color='red', linestyle='--')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: '{:.3f}'.format(x)))
        plt.subplot(3, 1, 3)
        plt.plot(history.history['binary_accuracy'])
        plt.plot(history.history['val_binary_accuracy'])
        plt.axvline(x=best_epoch, color='red', linestyle='--')
        plt.ylabel('Acurácia Binarizada')
        plt.xlabel('Epoch')
        plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: '{:.3f}'.format(x)))
        plt.savefig(os.path.join(RESULTS_DIR, f'{compose_name}.png'))
        plt.show()
    return history

In [None]:
callbacks = False
epochs = 100
batch_size = 600
current_date = datetime.now().strftime('%Y_%m_%d')
compute_model_analysis('min', 'normal', current_date, batch_size=batch_size, epochs=epochs, callbacks=callbacks)
compute_model_analysis('max', 'normal', current_date, batch_size=batch_size, epochs=epochs, callbacks=callbacks)
compute_model_analysis('mean', 'normal', current_date, batch_size=batch_size, epochs=epochs, callbacks=callbacks)

Epoch 1/100
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m61s[0m 5s/step - accuracy: 0.0922 - binary_accuracy: 0.7115 - binary_crossentropy: 0.6290 - f1_score: 0.0575 - loss: 0.6290 - precision: 0.3227 - recall: 0.3324 - val_accuracy: 0.7041 - val_binary_accuracy: 0.8502 - val_binary_crossentropy: 0.4148 - val_f1_score: 0.0485 - val_loss: 0.4148 - val_precision: 0.6381 - val_recall: 0.5683
Epoch 2/100
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 5s/step - accuracy: 0.5362 - binary_accuracy: 0.8535 - binary_crossentropy: 0.3970 - f1_score: 0.0674 - loss: 0.3970 - precision: 0.6632 - recall: 0.5417 - val_accuracy: 0.4804 - val_binary_accuracy: 0.8534 - val_binary_crossentropy: 0.3589 - val_f1_score: 0.0876 - val_loss: 0.3589 - val_precision: 0.6591 - val_recall: 0.5438
Epoch 3/100
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 5s/step - accuracy: 0.4080 - binary_accuracy: 0.8523 - binary_crossentropy: 0.3567 - f1_score: 0.0868 - loss: 0.3

<keras.src.callbacks.history.History at 0x79167f336e40>

# Comparações

In [None]:
def compute_general_metrics(y_real,y_pred,i=None):

  if i is None:
    names = 'general'
    accur = metrics.accuracy_score(y_real, y_pred)
    f1 = metrics.f1_score(y_real, y_pred, average='weighted')
    prec = metrics.precision_score(y_real, y_pred, average='weighted')
    rec = metrics.recall_score(y_real, y_pred, average='weighted')
    jacc = float(metrics.jaccard_score(y_real, y_pred, average='weighted'))
    hamm = metrics.hamming_loss(y_real, y_pred)
    log_loss = metrics.log_loss(y_real, y_pred)

  elif i is not None:
    names = column_names[i]
    accur = metrics.accuracy_score(y_real.iloc[:, i], y_pred[:, i])
    f1 = metrics.f1_score(y_real.iloc[:, i], y_pred[:, i],zero_division=0)
    prec = metrics.precision_score(y_real.iloc[:, i], y_pred[:, i],zero_division=0)
    rec = metrics.recall_score(y_real.iloc[:, i], y_pred[:, i],zero_division=0)
    jacc = float(metrics.jaccard_score(y_real.iloc[:, i], y_pred[:, i]))
    hamm = metrics.hamming_loss(y_real.iloc[:, i], y_pred[:, i])
    log_loss = metrics.log_loss(y_real.iloc[:, i], y_pred[:, i])

  return [names, accur, f1, prec, rec, jacc, hamm, log_loss]

In [None]:
def compute_all_comparisons(func_model, data_model):
    model_prefix = f'conv2d_{func_model}_{data_model}'
    # Ajuste a data conforme necessário ou automatize
    model_suffix = f'_2025_08_09_call_False'  # Substitua por variável se necessário
    model_name = model_prefix + model_suffix
    model = keras.models.load_model(os.path.join(MODELS_DIR, f'{model_name}.keras'))
    dataset_final = get_dataset(df_spectra_all, dataset_y, func_model)
    X = dataset_final[[col for col in dataset_final.columns if 'bin' in col]].apply(pd.to_numeric, errors='coerce').fillna(0)
    Y = dataset_final[column_names].apply(lambda x: x.astype(int))
    X_train, X_test_temp, Y_train, Y_test_temp = train_test_split(X, Y, test_size=0.25, random_state=42)
    X_validation, X_test, Y_validation, Y_test = train_test_split(X_test_temp, Y_test_temp, test_size=0.45, random_state=42)
    ratio = 5
    X_train_image = np.array([turn_spectra_into_image(X_train.loc[i], ratio=ratio) for i in X_train.index])[..., np.newaxis]
    X_validation_image = np.array([turn_spectra_into_image(X_validation.loc[i], ratio=ratio) for i in X_validation.index])[..., np.newaxis]
    X_test_image = np.array([turn_spectra_into_image(X_test.loc[i], ratio=ratio) for i in X_test.index])[..., np.newaxis]
    train_predict = (model.predict(X_train_image) > 0.5).astype(int)
    test_predict = (model.predict(X_test_image) > 0.5).astype(int)
    metrics_list = ['accuracy', 'f1_score', 'precision', 'recall', 'jaccard', 'hamming', 'log_loss']
    train_metrics = [compute_general_metrics(Y_train, train_predict)] + [compute_general_metrics(Y_train, train_predict, x) for x in range(len(column_names))]
    test_metrics = [compute_general_metrics(Y_test, test_predict)] + [compute_general_metrics(Y_test, test_predict, x) for x in range(len(column_names))]
    full_metrics_df = pd.DataFrame(train_metrics)
    full_metrics_df.columns = ['metric'] + metrics_list
    full_metrics_df['data'] = 'train'
    test_metrics_df = pd.DataFrame(test_metrics)
    test_metrics_df.columns = ['metric'] + metrics_list
    test_metrics_df['data'] = 'test'
    merged_df = pd.concat([full_metrics_df, test_metrics_df], ignore_index=True)
    merged_df['model'] = model_prefix
    return merged_df
results_list = []
for func_model in ['min', 'max', 'mean']:
    for data_model in ['normal']:
        results_list.append(compute_all_comparisons(func_model, data_model))

In [None]:
all_metrics = pd.concat(results_list) #.groupby(['model','data','metric']).mean()
all_metrics

In [None]:
all_metrics = pd.concat(results_list)
# all_metrics[(all_metrics['model'] == 'mlp_min_normal') & (all_metrics['data'] == 'test')][['metric','accuracy','f1_score','hamming','data']]
all_metrics[(all_metrics['data'] == 'test')].sort_values(by=['f1_score','hamming','model'], ascending = True)

In [None]:
all_metrics.to_csv(os.path.join(RESULTS_DIR, 'conv2d_all_results_2.csv'), index=False)