# Importación de Librerías
En esta sección se importarán todas las librerías necesarias para correr el código.

In [None]:
# Importación de Librerías
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import torch

import sys
import itertools
import os
import re
import time as t
import math
import logging
import pickle
from datetime import datetime
from collections import OrderedDict
from tqdm import tqdm
from copy import deepcopy

from tabulate import tabulate

from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import KFold, train_test_split

from imblearn.over_sampling import BorderlineSMOTE

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.profiler

!pip install torch_pruning
import torch_pruning as tp

!pip install ptflops
from ptflops import get_model_complexity_info

!pip install timm
from timm.data import resolve_data_config, create_loader

!pip install apex

try:
    from apex import amp
    from apex.parallel import DistributedDataParallel as DDP
    from apex.parallel import convert_syncbn_model
    from apex.parallel.optimized_sync_batchnorm import SyncBatchNorm
    has_apex = True
except ImportError:
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.nn import SyncBatchNorm
    has_apex = False

# Diseño estructural de funciones
En esta sección se diseñarán todas las funciones necesarias para el programa, incluyendo la arquitectura de preprocesamiento _________________

In [None]:
# Función para cargar los datos
def datos(ruta_x, ruta_y):
    X = pd.read_csv(ruta_x, header=None)
    y = pd.read_csv(ruta_y, header=None)
    return X, y

# Función para concatenar DataFrames
def concat(df_2=None, df_1=None):
    if df_2 is None:
        return df_1
    elif df_1 is None:
        return df_2
    else:
        return pd.concat([df_2, df_1])

# Función para aplicar BorderlineSMOTE
def smote(X, y):
    smote = BorderlineSMOTE()
    X_resampled, y_resampled = smote.fit_resample(X, y)
    return X_resampled, y_resampled

# Función para graficar los resultados de SMOTE
def graph_smote(y, y_smote):
    state_labels = {0: 'W', 1: 'N1', 2: 'N2', 3: 'N3', 4: 'R'}

    df = pd.DataFrame(columns=['Original', 'Improved'])

    for i in range(5):
        original_count = y.value_counts().get(i, 0)
        improved_count = y_smote.value_counts().get(i, 0)
        total_original = len(y)
        total_improved = len(y_smote)

        original_percentage = (original_count / total_original) * 100
        improved_percentage = (improved_count / total_improved) * 100

        df.loc[state_labels[i]] = [original_percentage, improved_percentage]

    ax = df.plot(kind='bar', color=['lightcoral', 'mediumpurple'], figsize=(10, 7))
    plt.xlabel('Sleep State')
    plt.ylabel('% of Dataset')
    plt.title(f'Class Balance Comparison')
    plt.legend(['Original', 'Improved'])

    for p in ax.patches:
        ax.annotate(format(p.get_height(), '.1f') + '%',
                    (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha = 'center',
                    va = 'center',
                    xytext = (0, 10),
                    textcoords = 'offset points')

# Función para normalizar los datos
def norm(X):
    scaler = MinMaxScaler()
    X_norm = scaler.fit_transform(X)
    X_norm_df = pd.DataFrame(X_norm, index=X.index, columns=X.columns)
    return X_norm_df

# Función para estandarizar los datos
def estd(X):
    scaler = StandardScaler()
    X_std = scaler.fit_transform(X)
    X_std_df = pd.DataFrame(X_std, index=X.index, columns=X.columns)
    return X_std_df

# Función para obtener las dimensiones de entrada y número de clases
def dims_class(X, y):
    input_dims = X.shape[1]
    n_classes = len(y.iloc[:, 0].unique())
    return input_dims, n_classes

# Función para búsqueda de mejores hiperparámetros
def hyperparameter_search(X, y, param_grid):
    input_dims, n_classes = dims_class(X, y)
    best_accuracy = 0.0
    best_params = None

    keys = param_grid.keys()
    param_combinations = list(itertools.product(*param_grid.values()))

    for params in param_combinations:
        param_dict = dict(zip(keys, params))
        print(f"\nEvaluando combinación de hiperparámetros: {param_dict}")

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32).unsqueeze(1)
        y_train_tensor = torch.tensor(y_train.values, dtype=torch.long).squeeze()
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).unsqueeze(1)
        y_test_tensor = torch.tensor(y_test.values, dtype=torch.long).squeeze()

        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=param_dict['batch_size'], shuffle=True)

        model_instance = DeepSleepNet(
            input_channels=1,
            n_rnn_layers=param_dict.get('n_rnn_layers', 2),
            dropout=param_dict.get('dropout', 0.5)
        )

        if param_dict['optimizer'] == 'adam':
            optimizer = optim.Adam(model_instance.parameters(), lr=param_dict['learning_rate'])
        elif param_dict['optimizer'] == 'sgd':
            optimizer = optim.SGD(model_instance.parameters(), lr=param_dict['learning_rate'], momentum=param_dict.get('momentum', 0.0))

        criterion = nn.CrossEntropyLoss()

        start_time = t.time()
        model_instance.train()
        for epoch in range(param_dict['epochs']):
            running_loss = 0.0
            for X_batch, y_batch in train_loader:
                optimizer.zero_grad()
                outputs = model_instance(X_batch)
                loss = criterion(outputs, y_batch)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            print(f"Epoch {epoch+1}/{param_dict['epochs']}, Loss: {running_loss/len(train_loader)}")
        end_time = t.time()

        model_instance.eval()
        with torch.no_grad():
            y_pred = model_instance(X_test_tensor)
            _, y_pred_classes = torch.max(y_pred, 1)
        accuracy = accuracy_score(y_test, y_pred_classes.numpy())

        total_time = end_time - start_time

        print(f"Accuracy: {accuracy:.4f}")
        print(f"Total Time: {total_time:.2f} seconds")

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_params = param_dict

    print(f"\nMejores Hiperparámetros: {best_params}")
    print(f"Mejor Precisión: {best_accuracy:.4f}")

    return best_params, best_accuracy

# Función de entrenamiento
def train_model(model, X, y, X_val, y_val, epochs, batch_size, learning_rate, optimizer_type, momentum=0.0):
    X_tensor = torch.tensor(X.values, dtype=torch.float32).unsqueeze(1)
    y_tensor = torch.tensor(y.values, dtype=torch.long).squeeze()
    X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).unsqueeze(1)
    y_val_tensor = torch.tensor(y_val.values, dtype=torch.long).squeeze()

    train_dataset = TensorDataset(X_tensor, y_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    if optimizer_type == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    elif optimizer_type == 'sgd':
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)

    criterion = nn.CrossEntropyLoss()

    model.train()
    start_time = t.time()
    epoch_accuracies = []
    for epoch in range(epochs):
        running_loss = 0.0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Calcular precisión en el conjunto de validación
        model.eval()
        with torch.no_grad():
            y_pred = model(X_val_tensor)
            _, y_pred_classes = torch.max(y_pred, 1)
            accuracy = accuracy_score(y_val_tensor.numpy(), y_pred_classes.numpy())
            epoch_accuracies.append(accuracy)
            model.train()  # Regresar al modo de entrenamiento

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader)}, Accuracy: {accuracy:.4f}")

    end_time = t.time()
    training_time = end_time - start_time
    print(f"Training Time: {training_time:.2f} seconds")

    return model, training_time, epoch_accuracies

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def accuracy(X, y, model, param_dict, test_size=0.2):

    # Split the dataset into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=test_size, random_state=42)

    # Usar directamente el modelo podado pasado como parámetro
    model_instance = model

    # Imprimir detalles del modelo para verificar que es el modelo podado
    print("Inside accuracy function - model details:")
    print(model_instance)
    print("Number of parameters inside accuracy function:", count_parameters(model_instance))

    # Timing the training process
    start_time = t.time()
    model_instance, training_time, epoch_accuracies = train_model(
        model_instance, X_train, y_train, X_val, y_val, param_dict['epochs'],
        param_dict['batch_size'], param_dict['learning_rate'],
        param_dict['optimizer'], param_dict.get('momentum', 0.0)
    )
    end_time = t.time()
    training_time = end_time - start_time

    average_training_accuracy = np.mean(epoch_accuracies)
    print(f"Training completed - Average Accuracy: {average_training_accuracy:.4f}, Training Time: {training_time:.2f} seconds")

    model_instance.eval()
    X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).unsqueeze(1)
    y_val_tensor = torch.tensor(y_val.values, dtype=torch.long).squeeze()
    with torch.no_grad():
        y_pred = model_instance(X_val_tensor)
        _, y_pred_classes = torch.max(y_pred, 1)
    validation_accuracy = accuracy_score(y_val, y_pred_classes.numpy())

    print(f"Validation Accuracy: {validation_accuracy:.4f}")

    # Calculate MACs, FLOPs, and parameters using ptflops
    macs, flops = get_model_complexity_info(model_instance, (1, X_val_tensor.size(2)), as_strings=False, print_per_layer_stat=False)
    total_params = count_parameters(model_instance)

    print(f"MACs: {macs}, FLOPs: {flops}, Parameters: {total_params}")

    metrics = {
        'training_accuracy': average_training_accuracy,
        'validation_accuracy': validation_accuracy,
        'training_time': training_time,
        'macs': macs,
        'flops': flops,
        'parameters': total_params
    }

    return metrics

#Función de cross validation
def cross_validation_accuracy(X, y, model, param_dict, folds=5):
    kf = KFold(n_splits=folds)
    total_accuracy = 0.0
    total_training_time = 0.0
    times_per_fold = []
    macs_per_fold = []
    flops_per_fold = []
    accuracies_per_fold = []

    for fold, (train_index, val_index) in enumerate(kf.split(X), 1):
        print(f"\nFold {fold}/{folds}")

        X_train, X_val = X.iloc[train_index], X.iloc[val_index]
        y_train, y_val = y.iloc[train_index], y.iloc[val_index]

        model_instance = DeepSleepNet(
            input_channels=1,
            n_rnn_layers=param_dict.get('n_rnn_layers', 2),
            dropout=param_dict.get('dropout', 0.5)
        )

        # Timing the training process
        start_time = t.time()
        model_instance, training_time, epoch_accuracies = train_model(
            model_instance, X_train, y_train, X_val, y_val, param_dict['epochs'],
            param_dict['batch_size'], param_dict['learning_rate'],
            param_dict['optimizer'], param_dict.get('momentum', 0.0)
        )
        end_time = t.time()
        fold_training_time = end_time - start_time
        total_training_time += fold_training_time
        times_per_fold.append(fold_training_time)

        fold_accuracy = np.mean(epoch_accuracies)
        print(f"Fold {fold} - Average Accuracy: {fold_accuracy:.4f}, Training Time: {fold_training_time:.2f} seconds")

        model_instance.eval()
        X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).unsqueeze(1)
        y_val_tensor = torch.tensor(y_val.values, dtype=torch.long).squeeze()
        with torch.no_grad():
            y_pred = model_instance(X_val_tensor)
            _, y_pred_classes = torch.max(y_pred, 1)
        accuracy = accuracy_score(y_val, y_pred_classes.numpy())
        total_accuracy += accuracy
        accuracies_per_fold.append(accuracy)

        # Calculate MACs and FLOPs using ptflops
        macs, flops = get_model_complexity_info(model_instance, (1, X_val_tensor.size(2)), as_strings=False, print_per_layer_stat=False)
        macs_per_fold.append(macs)
        flops_per_fold.append(flops)

    average_accuracy = total_accuracy / folds
    print(f"\nFinal Average Accuracy: {average_accuracy:.4f}")
    print(f"Total Training Time for all folds: {total_training_time:.2f} seconds")

    metrics = {
        'average_accuracy': average_accuracy,
        'total_training_time': total_training_time,
        'times_per_fold': times_per_fold,
        'macs_per_fold': macs_per_fold,
        'flops_per_fold': flops_per_fold,
        'accuracies_per_fold': accuracies_per_fold
    }

    return metrics

def compare_models(pruned_metrics, unpruned_metrics):
    # Print the metrics dictionaries for debugging
    print("Pruned Metrics:", pruned_metrics)
    print("Unpruned Metrics:", unpruned_metrics)

    # Unpack the metrics
    pruned_training_accuracy = pruned_metrics['training_accuracy']
    pruned_validation_accuracy = pruned_metrics['validation_accuracy']
    pruned_training_time = pruned_metrics['training_time']
    pruned_macs = pruned_metrics['macs']
    pruned_flops = pruned_metrics['flops']
    pruned_parameters = pruned_metrics['parameters']

    unpruned_training_accuracy = unpruned_metrics['training_accuracy']
    unpruned_validation_accuracy = unpruned_metrics['validation_accuracy']
    unpruned_training_time = unpruned_metrics['training_time']
    unpruned_macs = unpruned_metrics['macs']
    unpruned_flops = unpruned_metrics['flops']
    unpruned_parameters = unpruned_metrics['parameters']

    # Create a comparison DataFrame
    comparison_df = pd.DataFrame({
        'Metric': ['Training Accuracy', 'Validation Accuracy', 'Training Time', 'MACs', 'FLOPs', 'Parameters'],
        'Pruned Model': [
            pruned_training_accuracy,
            pruned_validation_accuracy,
            pruned_training_time,
            pruned_macs,
            pruned_flops,
            pruned_parameters
        ],
        'Unpruned Model': [
            unpruned_training_accuracy,
            unpruned_validation_accuracy,
            unpruned_training_time,
            unpruned_macs,
            unpruned_flops,
            unpruned_parameters
        ]
    })

    # Format values to avoid scientific notation
    def format_value(x):
        if isinstance(x, float):
            return f'{x:.6f}'
        elif isinstance(x, int):
            return f'{x:,}'
        else:
            return x

    comparison_df = comparison_df.applymap(format_value)

    return comparison_df


# Creación de Modelo Clasificador
En esta sección se implementan las clases para el diseño de clasificador y se ejecuta el clasificador en los datos sin comprimir modelo.

Artículo Publicado en: [Papers with Code](https://https://paperswithcode.com/).

DeepSleepNet: a Model for Automatic Sleep Stage Scoring based on Raw Single-Channel EEG'

Puede acceder al artículo y fuente de código en el siguiente [enlace](https://paperswithcode.com/paper/deepsleepnet-a-model-for-automatic-sleep).

El clasificador fue modificado para su funcionamiento en PyTorch para implementar métodos de compresión en PyTorch.

In [None]:
class DeepFeatureNet(nn.Module):
    def __init__(self, input_channels):
        super(DeepFeatureNet, self).__init__()
        self.features_s = nn.Sequential(
            nn.Conv1d(input_channels, 64, 50, 6, padding=24),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=8, stride=8, padding=4),
            nn.Dropout(),
            nn.Conv1d(64, 128, 6, padding=3),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 128, 6, padding=3),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 128, 6, padding=3),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2, padding=1),
        )
        self.features_l = nn.Sequential(
            nn.Conv1d(input_channels, 64, 400, 50, padding=200),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=4, stride=4, padding=2),
            nn.Dropout(),
            nn.Conv1d(64, 128, 8, padding=3),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 128, 8, padding=3),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, 128, 8, padding=3),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2, padding=1),
        )

    def forward(self, x):
        x_s = self.features_s(x)
        x_l = self.features_l(x)
        x = torch.cat((x_s, x_l), 2)
        x = x.view(x.size(0), -1, x.size(2))
        return x

class DeepSleepNet(nn.Module):
    def __init__(self, input_channels, n_rnn_layers=2, dropout=0.5):
        super(DeepSleepNet, self).__init__()
        self.feature_extractor = DeepFeatureNet(input_channels)

        # Calcular el tamaño de la salida del extractor de características
        dummy_input = torch.zeros(1, input_channels, 3000)
        dummy_output = self.feature_extractor(dummy_input)
        features_len = dummy_output.shape[2]  # Usar la longitud correcta para el LSTM
        lstm_input_size = dummy_output.shape[1]  # La dimensión de características para LSTM

        self.features_seq = nn.LSTM(lstm_input_size, 512, batch_first=True, bidirectional=True, dropout=dropout, num_layers=n_rnn_layers)
        self.res = nn.Linear(512 * 2, 1024)  # 512 * 2 debido a bidireccionalidad

    def forward(self, x):
        x = self.feature_extractor(x)
        x = x.transpose(1, 2)  # Transponer para que la dimensión de características sea la entrada del LSTM
        x_blstm, _ = self.features_seq(x)
        x_res = self.res(x_blstm[:, -1, :])  # Usar la última salida de LSTM
        x = torch.mul(x_res, x_blstm[:, -1, :])  # Multiplicar por la última salida de LSTM
        return x


# Importación y Organización de Datos
En esta sección estaremos importando conjuntos de datos de sueño y estaremos uniéndolos para procesar una mayor cantidad de datos.

In [None]:
# Inicializamos df_X y df_y
df_X = None
df_y = None

## Importamos los datos y concatenamos para que sea un Dataframe
#for i in range(6):
#    X, y = datos(f"/content/drive/MyDrive/Investigacion/Dataset_1/SC_{i}.csv", f"/content/drive/MyDrive/Investigacion/Dataset_1/y_{i}.csv")
#    df_X = concat(df_X, X)
#    df_y = concat(df_y, y)

for i in range(1):
    X, y = datos(f"/content/drive/MyDrive/Investigacion/Dataset_1/SC_{i}.csv", f"/content/drive/MyDrive/Investigacion/Dataset_1/y_{i}.csv")
    df_X = concat(df_X, X)
    df_y = concat(df_y, y)


# Preprocesamiento de Datos
En esta sección estaremos preprocesando los datos con BorderlineSmote, Estandarización y Normalización.

In [None]:
# BorderlineSmote con Raw Data
X_smote, y_smote = smote(df_X, df_y)
graph_smote(df_y, y_smote)

# BorderlineSmote con Standarized Data
X_std = estd(df_X)
X_std_smote, y_std_smote = smote(X_std, df_y)

# BorderlineSmote con Normalized Data
X_norm = norm(df_X)
X_norm_smote, y_norm_smote = smote(X_norm, df_y)


# Búsqueda de mejores Hiperparámetros
En esta sección, se creo una función de iteración de parámetros para hallar los mejores parámetros para la función.

In [None]:
# Búsqueda de mejores hiperparámetros
param_grid = {
    'epochs': [25, 50, 100],
    'batch_size': [100, 500, 1000],
    'learning_rate': [0.0001, 0.001, 0.01],
    'optimizer': ['adam', 'sgd']
}

best_params, best_accuracy = hyperparameter_search(X_smote, y_smote, param_grid)
best_params_std, best_accuracy_std = hyperparameter_search(X_std_smote, y_std_smote, param_grid)
best_params_norm, best_accuracy_norm = hyperparameter_search(X_norm_smote, y_norm_smote, param_grid)


# Implementación de Clasificador sin Poda
En esta sección se aplica y entrena el modelo de clasificación con los mejores hiperparámetros hallados para el modelo de deep learning.


In [None]:
# Definir el diccionario de hiperparámetros
param_dict = {
    'epochs': 10,
    'batch_size': 1000,
    'learning_rate': 0.001,
    'optimizer': 'adam',
    'n_rnn_layers': 2,
    'dropout': 0.5
}

# Crear el modelo
model = DeepSleepNet(
    input_channels=1,
    n_rnn_layers=param_dict['n_rnn_layers'],
    dropout=param_dict['dropout']
)


# Entrenar el modelo y calcular el accuracy sin validación cruzada
unpruned_metrics = accuracy(X_std_smote, y_std_smote, model, param_dict)


# Creación del Modelo de Prunning
En esta sección veremos la definición de las clases de compresión creando un Modelo de Prunning ideal usando Slimming Pruner

Artículo Publicado en: [Papers with Code](https://https://paperswithcode.com/).

Prunning Filters For Efficient ConvNets

Puede acceder al artículo y fuente de código en el siguiente [enlace](https://paperswithcode.com/sota/network-pruning-on-imagenet).

In [None]:
class MySlimmingPruner(tp.pruner.MetaPruner):
    def regularize(self, model, reg):
        for m in model.modules():
            if isinstance(m, (nn.BatchNorm1d, nn.BatchNorm2d, nn.BatchNorm3d)) and m.affine==True:
                m.weight.grad.data.add_(reg*torch.sign(m.weight.data)) # Lasso for sparsity

class MySlimmingImportance(tp.importance.Importance):
    def __call__(self, group, **kwargs):
        group_imp = [] # (num_bns, num_channels)
        # 1. iterate the group to estimate importance
        for dep, idxs in group:
            layer = dep.target.module # get the target model
            prune_fn = dep.handler    # get the pruning function of target model, unused in example
            if isinstance(layer, (nn.BatchNorm1d, nn.BatchNorm2d, nn.BatchNorm3d)) and layer.affine:
                local_imp = torch.abs(layer.weight.data)
                group_imp.append(local_imp)
        if len(group_imp)==0: return None # return None if the group contains no BN layer
        # 2. reduce your group importance to a 1-D scroe vector. Here we use the average score across layers.
        group_imp = torch.stack(group_imp, dim=0).mean(dim=0)
        return group_imp # (num_channels, )

# You can implement any importance functions, as long as it transforms a group to a 1-D score vector.
class RandomImportance(tp.importance.Importance):
    @torch.no_grad()
    def __call__(self, group, **kwargs):
        _, idxs = group[0]
        return torch.rand(len(idxs))


# Implementación Modo de Poda ConvNets (DepGraphs)
En esta sección se implementan la clases para el diseño de compresión y se ejecuta en el clasificador antes de entrenar.



1.   features_s[0]: Conv1d(1, 64, kernel_size=(50,), stride=(6,), padding=(24,))
2.   features_s[5]: Conv1d(64, 128, kernel_size=(6,), stride=(1,), padding=(3,))
3.   features_s[7]: Conv1d(128, 128, kernel_size=(6,), stride=(1,), padding(3,))
4.   features_s[9]: Conv1d(128, 128, kernel_size=(6,), stride=(1,), padding(3,))
5.   features_l[0]: Conv1d(1, 64, kernel_size=(400,), stride=(50,), paddi(200,))
6.   features_l[5]: Conv1d(64, 128, kernel_size=(8,), stride=(1,), padding=(3,))
7.   features_l[7]: Conv1d(128, 128, kernel_size=(8,), stride=(1,), padding(3,))
8.   features_l[9]: Conv1d(128, 128, kernel_size=(8,), stride=(1,), padding(3,))





In [None]:
# Crear el modelo
model = DeepSleepNet(
    input_channels=1,
    n_rnn_layers=param_dict['n_rnn_layers'],
    dropout=param_dict['dropout']
)

# Convertir los datos a tensores de PyTorch
input_dims, n_classes = dims_class(X_smote, y_smote)
X_smote_tensor = torch.tensor(X_smote.values, dtype=torch.float32).view(-1, 1, input_dims)

# Usar un subconjunto de los datos como ejemplos de entrada para la poda
example_inputs = X_smote_tensor[:64]

# Imprimir el modelo antes de la poda
print("Before pruning:")
print(model)

# Contar los parámetros antes de la poda
print("Parameters before pruning:", count_parameters(model))

# Construir el grafo de dependencias para DeepSleepNet
DG = tp.DependencyGraph().build_dependency(model, example_inputs=example_inputs)

# Seleccionar canales para podar más agresivamente de múltiples capas
pruning_ratios = [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5]  # Podar 50% de los canales
layers_to_prune = [
    model.feature_extractor.features_s[0],  # Conv1
    model.feature_extractor.features_s[5],  # Conv2
    model.feature_extractor.features_s[7],  # Conv3
    model.feature_extractor.features_s[9],  # Conv4

    model.feature_extractor.features_l[0],  # Conv5
    model.feature_extractor.features_l[5],  # Conv6
    model.feature_extractor.features_l[7],  # Conv7
    model.feature_extractor.features_l[9],  # Conv8
]

for layer, ratio in zip(layers_to_prune, pruning_ratios):
    num_channels = layer.out_channels
    pruning_idxs = list(range(0, num_channels, int(1/ratio)))
    print(f"Pruning indices for layer {layer}: {pruning_idxs}")  # Verificación de índices de poda
    pruning_group = DG.get_pruning_group(layer, tp.prune_conv_out_channels, idxs=pruning_idxs)
    if DG.check_pruning_group(pruning_group):
        pruning_group.prune()
    else:
        print(f"Skipping pruning for layer {layer} due to dependency check failure.")

# Imprimir el modelo después de la poda
print("After pruning:")
print(model)

# Contar los parámetros después de la poda
print("Parameters after pruning:", count_parameters(model))

# Entrenar el modelo y calcular el accuracy sin validación cruzada
pruned_metrics_CN = accuracy(X_std_smote, y_std_smote, model, param_dict)


In [None]:
# Imprimir resultados
comparison_df = compare_models(pruned_metrics_CN, unpruned_metrics)
print(comparison_df)


# Knapsack
Corrido el modelo de knapsack con el modelo de DeepSleep

[github](https://github.com/yoniaflalo/knapsack_pruning)

In [None]:
import torch
import torch.nn as nn

class KnapsackPruning:
    def __init__(self, model, data_config, loader, args, use_amp=False):
        self.model = model
        self.data_config = data_config
        self.loader = loader
        self.args = args
        self.use_amp = use_amp
        self.new_net = None

    def compute_num_channels_per_layer_taylor(self):
        list_channel_to_prune = []
        for name, module in self.model.named_modules():
            if isinstance(module, nn.Conv1d):
                importance_scores = torch.rand(module.out_channels)  # Placeholder for importance score calculation
                num_channels_to_prune = int(module.out_channels * self.args.pruning_ratio)
                channels_to_prune = importance_scores.topk(num_channels_to_prune, largest=False)[1]
                list_channel_to_prune.append((name, channels_to_prune))
        return list_channel_to_prune

    def prune_batchnorm(self, batchnorm_layer, mask):
        num_features_to_keep = mask.sum().item()

        print(f"Pruning BatchNorm1d: {batchnorm_layer}")
        print(f"Original running_mean shape: {batchnorm_layer.running_mean.shape}, running_var shape: {batchnorm_layer.running_var.shape}")
        print(f"Mask: {mask}")

        bn = nn.BatchNorm1d(num_features_to_keep).to(batchnorm_layer.weight.device)

        bn.weight.data = batchnorm_layer.weight.data[mask].clone()
        bn.bias.data = batchnorm_layer.bias.data[mask].clone()
        bn.running_mean = batchnorm_layer.running_mean[mask].clone()
        bn.running_var = batchnorm_layer.running_var[mask].clone()

        print(f"New BatchNorm1d layer: {bn}")
        print(f"New running_mean shape: {bn.running_mean.shape}, running_var shape: {bn.running_var.shape}")

        return bn

    def prune_conv1d_layer(self, layer, channels_to_prune):
        mask = torch.ones(layer.out_channels, dtype=torch.bool)
        mask[channels_to_prune] = False

        new_out_channels = mask.sum().item()

        print(f"Pruning Conv1d: {layer}")
        print(f"Original weight shape: {layer.weight.shape}")
        if layer.bias is not None:
            print(f"Original bias shape: {layer.bias.shape}")

        new_conv = nn.Conv1d(
            in_channels=layer.in_channels,
            out_channels=new_out_channels,
            kernel_size=layer.kernel_size,
            stride=layer.stride,
            padding=layer.padding,
            bias=layer.bias is not None
        )

        new_conv.weight.data = layer.weight.data[mask].clone()
        if layer.bias is not None:
            new_conv.bias.data = layer.bias.data[mask].clone()

        print(f"New Conv1d layer: {new_conv}")
        print(f"New weight shape: {new_conv.weight.shape}")
        if new_conv.bias is not None:
            print(f"New bias shape: {new_conv.bias.shape}")

        # Return the mask for BatchNorm layer pruning
        return new_conv, mask

    def apply_pruning(self, layers, list_channel_to_prune, prefix):
        new_layers = []
        previous_mask = None
        for name, module in layers.named_children():
            full_name = f"{prefix}.{name}"
            if isinstance(module, nn.Conv1d):
                channels_to_prune = next((channels for lname, channels in list_channel_to_prune if lname == full_name), None)
                if channels_to_prune is not None:
                    new_conv, mask = self.prune_conv1d_layer(module, channels_to_prune)
                    new_layers.append((name, new_conv))
                    previous_mask = mask
                else:
                    new_layers.append((name, module))
                    previous_mask = None
            elif isinstance(module, nn.BatchNorm1d) and previous_mask is not None:
                # Adjust the mask to the number of features in the BatchNorm layer
                print(f"Pruning BatchNorm1d for {full_name} with previous mask {previous_mask}")
                previous_mask = previous_mask[:module.num_features]
                new_bn = self.prune_batchnorm(module, previous_mask)
                new_layers.append((name, new_bn))
                previous_mask = None
            else:
                new_layers.append((name, module))
                previous_mask = None

        # Debugging: Check consistency of layers
        for i, (name, module) in enumerate(new_layers):
            if isinstance(module, nn.Conv1d):
                print(f"Layer {i} ({name}) is Conv1d with {module.out_channels} out_channels")
            elif isinstance(module, nn.BatchNorm1d):
                print(f"Layer {i} ({name}) is BatchNorm1d with {module.num_features} features")

        return nn.Sequential(*[module for name, module in new_layers])

    def redesign_module_deepfeature(self, list_channel_to_prune):
        self.model.feature_extractor.features_s = self.apply_pruning(
            self.model.feature_extractor.features_s,
            list_channel_to_prune,
            "feature_extractor.features_s"
        )
        self.model.feature_extractor.features_l = self.apply_pruning(
            self.model.feature_extractor.features_l,
            list_channel_to_prune,
            "feature_extractor.features_l"
        )

    def prune_model(self):
        list_channel_to_prune = self.compute_num_channels_per_layer_taylor()
        self.redesign_module_deepfeature(list_channel_to_prune)

        self.model.train()
        if isinstance(self.model, nn.DataParallel) or isinstance(self.model, nn.parallel.DistributedDataParallel):
            self.model = self.model.module
        else:
            self.model = self.model.cuda()

        optimizer = self.create_optimizer(self.model)

        return self.model, optimizer

    def create_optimizer(self, model):
        optimizer = torch.optim.Adam(model.parameters(), lr=self.args.lr)
        return optimizer


In [None]:
import argparse

# Example setup for arguments, data_config, and loader
data_config = None  # This should be your actual data configuration
loader = None  # This should be your actual data loader
args = argparse.Namespace(
    pruning_ratio=0.2,  # Example pruning ratio (20%)
    lr=0.001,            # Example learning rate for the optimizer
    batch_size=1000,      # Example batch size
    batch_size_prune=1000,  # Example pruning batch size
    use_time=False,     # Example flag for using timing in pruning
)  # Replace with actual arguments as needed

# Initialize the pruning class with your model
knapsack_pruning = KnapsackPruning(model, data_config, loader, args)

# Prune the model
pruned_model, optimizer = knapsack_pruning.prune_model()

device = torch.device("cpu")
pruned_model.to(device)

# Evaluate the pruned model
# Assuming you have a function `accuracy` that takes input data, labels, and the model to compute accuracy
pruned_metrics_KS = accuracy(X_std_smote, y_std_smote, pruned_model, param_dict)

# Print the pruned model's metrics
print("Pruned Model Metrics:")
print(pruned_metrics_KS)
