# Ejemplo de Pruning
---

En este ejemplo vamos a ver como eliminar filtros completos de un modelo para ahorrar cómputo y memoria. Para ello, vamos a aplicar pruning estructurado para eliminar un porcentaje de filtros de cada capa. Para ello, vamos a seguir los siguientes pasos:

1.   Partimos de un modelo preentrenado.
2.   Aplicamos el pruning y eliminamos aquellos filtros menos importantes en función de un criterio predefinido (norma L2 en nuestro caso).
3.   Aplicamos un fine tuning para ajustar los pesos frente a los nuevos cambios

---

## 1. Instalar e importar las librerías necesarias

En este ejemplo vamos a trabajar con Pytorch y los modelos de torchvision

In [None]:
!pip3 install torchinfo



In [None]:
from torchvision.models import alexnet, AlexNet_Weights
from torchinfo import summary
import torch
import torchvision
import time
import numpy as np
import torch.nn as nn
import torch.nn.utils.prune as prune
import copy

## 2. Definir los modelos

Por simplicidad, vamos a trabajar con Alexnet que es un modelo lineal sin conexiones residuales. Esto no es por requirimientos del pruning, es por facilidad de programar la eliminación de filtros. A cualquier modelo se le puede aplicar pruning, simplemente hay que tener en cuenta dependencias de las conexiones residuales para eliminar los filtros a 0.

In [None]:
model = torchvision.models.alexnet(weights=AlexNet_Weights)
preprocessing = AlexNet_Weights.IMAGENET1K_V1.transforms()
summary(model, input_size=(1, 3, 224, 224))



Layer (type:depth-idx)                   Output Shape              Param #
AlexNet                                  [1, 1000]                 --
├─Sequential: 1-1                        [1, 256, 6, 6]            --
│    └─Conv2d: 2-1                       [1, 64, 55, 55]           23,296
│    └─ReLU: 2-2                         [1, 64, 55, 55]           --
│    └─MaxPool2d: 2-3                    [1, 64, 27, 27]           --
│    └─Conv2d: 2-4                       [1, 192, 27, 27]          307,392
│    └─ReLU: 2-5                         [1, 192, 27, 27]          --
│    └─MaxPool2d: 2-6                    [1, 192, 13, 13]          --
│    └─Conv2d: 2-7                       [1, 384, 13, 13]          663,936
│    └─ReLU: 2-8                         [1, 384, 13, 13]          --
│    └─Conv2d: 2-9                       [1, 256, 13, 13]          884,992
│    └─ReLU: 2-10                        [1, 256, 13, 13]          --
│    └─Conv2d: 2-11                      [1, 256, 13, 13]         

## 3. Definir un data loader

Por limitaciones de tiempo de cómputo, vamos a trabajar con CIFAR-10 pero cualquier dataset es válido. Primero, tenemos que crear un DataLoader de Pytorch para poder usar los datos con nuestro modelo.


In [None]:
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=preprocessing)
train_data_loader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=preprocessing)
test_data_loader = torch.utils.data.DataLoader(testset, batch_size=128,
                                         shuffle=False, num_workers=2)



## 4. Fine tuning a CIFAR-10
Como torchvision solo proporciona los pesos para ImageNet, tenemos que hacer un fine tuning inicial a CIFAR-10 para ajustar el modelo al nuevo dataset. Usar ImageNet en este ejemplo no es posible porque ocupa varios cientos de GB!

In [None]:
n_epochs = 1
print(f'** FT a CIFAR-10 **')
opt = torch.optim.Adam(model.parameters(), lr=0.0001)
loss_fn = torch.nn.CrossEntropyLoss()
model.train().to('cuda')
for epoch in range(n_epochs): # Entrenamos n epocas
    train_running_loss = 0.0
    train_running_correct = 0
    counter = 0
    time_start = time.time()
    for inputs, labels in train_data_loader: # Obtenemos todos los batch de entrenamiento y los usamos para entrenar
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        opt.zero_grad()
        outs_model = model(inputs)
        loss = loss_fn(outs_model, labels)
        train_running_loss += loss.item()
        _, preds = torch.max(outs_model.data, 1)
        train_running_correct += (preds == labels).sum().item()
        counter = counter + 1
        loss.backward()
        opt.step()

    epoch_loss = train_running_loss / counter
    epoch_acc = 100. * (train_running_correct / len(train_data_loader.dataset))
    time_end = time.time() - time_start
    print(f'** Summary for epoch {epoch}: '
		f'loss: {epoch_loss:#.3g}, acc: {epoch_acc:#.3g}]  '
		f'time: {time_end:.3f}s **')

# Test
test_correct = 0
with torch.no_grad():
    time_start = time.time()
    for inputs, labels in test_data_loader: # Obtenemos todos los batch de test y los usamos para test
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        outs_model = model(inputs)
        _, preds = torch.max(outs_model.data, 1)
        test_correct += (preds == labels).sum().item()

    acc = 100. * (test_correct / len(test_data_loader.dataset))
    time_end = time.time() - time_start
    print(f'** Summary for model: '
		f'acc: {acc:#.3g}]  '
		f'time: {time_end:.3f}s **')

** FT a CIFAR-10 **


KeyboardInterrupt: 

## 5. Aplicar el pruning
En este proceso vamos a aplicar el pruning, es decir, primero vamos a seleccionar los filtros a eliminar y luego, usando las funciones auxiliares que hemos creado, vamos a generar una nueva red eliminando esos filtros.

In [None]:
def prune_network_structured(model_to_prune, layers_to_prune_config):
    """
    Fase 1: Aplica poda estructurada para poner pesos a cero en las capas especificadas.
    Modifica el modelo in-situ.

    Args:
        model_to_prune (nn.Module): El modelo a podar.
        layers_to_prune_config (dict): Un diccionario que mapea el módulo a podar
                                        a la fracción de poda.
                                        Ej: {model.features[0]: 0.25}
    """
    print("--- FASE 1: Generando ceros con poda estructurada ---")
    for layer, amount in layers_to_prune_config.items():
        if amount > 0 and isinstance(layer, (nn.Conv2d, nn.Linear)):
            print(f"Podando el {amount*100:.0f}% de {layer.__class__.__name__} (capa {list(layers_to_prune_config.keys()).index(layer)})")
            prune.ln_structured(layer, name="weight", amount=amount, n=1, dim=0)
            # Hacemos la poda permanente en el tensor de pesos
            prune.remove(layer, "weight")
    print("Ceros generados.\n")
    return model_to_prune


def _prune_sequential_module(sequential_module, last_conv_output_channels=None):
    """
    Helper para la Fase 2: Reconstruye un módulo nn.Sequential eliminando
    físicamente las capas con pesos a cero.
    """
    new_layers = []
    # Índices de los canales/neuronas eliminados en la capa ANTERIOR
    last_layer_pruned_indices = None

    # Si es el clasificador, la primera capa lineal necesita un trato especial
    is_classifier = any(isinstance(m, nn.Linear) for m in sequential_module)
    first_linear_handled = not is_classifier

    for layer in sequential_module.children():
        if isinstance(layer, nn.Conv2d):
            # 1. Ajustar canales de entrada si la capa anterior fue podada
            if last_layer_pruned_indices is not None:
                keep_indices = [i for i in range(layer.in_channels) if i not in last_layer_pruned_indices]
                layer.in_channels = len(keep_indices)
                # La poda de grupos requiere una lógica más compleja, asumimos groups=1 o poda simétrica
                if layer.groups > 1 and len(keep_indices) % layer.groups != 0:
                     print(f"Warning: Group convolution might be inconsistent after pruning. In channels: {len(keep_indices)}, Groups: {layer.groups}")
                layer.weight = nn.Parameter(layer.weight.data.clone()[:, keep_indices, :, :])

            # 2. Identificar y podar filtros de salida nulos
            sum_of_weights = torch.sum(torch.abs(layer.weight.data), dim=(1, 2, 3))
            non_zero_indices = torch.where(sum_of_weights != 0)[0]

            # Crear y añadir la nueva capa
            new_conv = nn.Conv2d(
                in_channels=layer.in_channels,
                out_channels=len(non_zero_indices),
                kernel_size=layer.kernel_size, stride=layer.stride,
                padding=layer.padding, bias=(layer.bias is not None),
                groups=layer.groups
            )
            new_conv.weight.data = layer.weight.data[non_zero_indices].clone()
            if layer.bias is not None:
                new_conv.bias.data = layer.bias.data[non_zero_indices].clone()
            new_layers.append(new_conv)

            # 3. Guardar índices podados para la siguiente capa
            all_indices = set(range(layer.out_channels))
            kept_indices = set(non_zero_indices.tolist())
            last_layer_pruned_indices = list(all_indices - kept_indices)

        elif isinstance(layer, nn.Linear):
            # 1. Ajustar neuronas de entrada
            # Caso especial: primera capa del clasificador de AlexNet
            if not first_linear_handled and last_conv_output_channels is not None:
                # El in_features de AlexNet es out_channels * 6 * 6
                new_in_features = last_conv_output_channels * 6 * 6
                layer.in_features = new_in_features
                original_weights = layer.weight.data.clone()
                # Ajustamos la matriz de pesos para que coincida con la nueva entrada
                # Esto es una simplificación, puede que no sea óptimo para todas las arquitecturas
                layer.weight = nn.Parameter(original_weights[:, :new_in_features])
                first_linear_handled = True
            # Caso general: una capa lineal sigue a otra lineal que fue podada
            elif last_layer_pruned_indices is not None:
                keep_indices = [i for i in range(layer.in_features) if i not in last_layer_pruned_indices]
                layer.in_features = len(keep_indices)
                layer.weight = nn.Parameter(layer.weight.data.clone()[:, keep_indices])

            # 2. Identificar y podar neuronas de salida nulas
            sum_of_weights = torch.sum(torch.abs(layer.weight.data), dim=1)
            non_zero_indices = torch.where(sum_of_weights != 0)[0]

            # Evitar crear una capa final con cero neuronas si es la capa de salida
            if len(non_zero_indices) == 0 and len(new_layers) > 0 and isinstance(sequential_module[-1], nn.Linear):
                print("Warning: La capa de salida ha sido completamente podada. Se mantendrá 1 neurona para evitar errores.")
                non_zero_indices = torch.tensor([0]) # Mantener al menos una neurona

            new_linear = nn.Linear(
                in_features=layer.in_features,
                out_features=len(non_zero_indices),
                bias=(layer.bias is not None)
            )
            new_linear.weight.data = layer.weight.data[non_zero_indices].clone()
            if layer.bias is not None:
                new_linear.bias.data = layer.bias.data[non_zero_indices].clone()
            new_layers.append(new_linear)

            # 3. Guardar índices podados
            all_indices = set(range(layer.out_features))
            kept_indices = set(non_zero_indices.tolist())
            last_layer_pruned_indices = list(all_indices - kept_indices)

        elif isinstance(layer, nn.BatchNorm2d):
            if last_layer_pruned_indices:
                keep_indices = [i for i in range(layer.num_features) if i not in last_layer_pruned_indices]
                new_bn = nn.BatchNorm2d(len(keep_indices))
                new_bn.weight.data = layer.weight.data[keep_indices].clone()
                new_bn.bias.data = layer.bias.data[keep_indices].clone()
                new_bn.running_mean = layer.running_mean[keep_indices].clone()
                new_bn.running_var = layer.running_var[keep_indices].clone()
                new_layers.append(new_bn)
            else:
                new_layers.append(layer)

        else: # ReLU, MaxPool2d, Dropout, etc.
            new_layers.append(copy.deepcopy(layer))

    return nn.Sequential(*new_layers)

In [None]:
def create_compact_model(model_with_zeros):
    """
    Fase 2: Reconstruye un modelo AlexNet para eliminar físicamente las capas con ceros.

    Args:
        model_with_zeros (nn.Module): El modelo AlexNet que ya tiene filtros/neuronas a cero.

    Returns:
        nn.Module: Un nuevo modelo compacto y coherente.
    """
    print("--- FASE 2: Creando modelo compacto (eliminación física) ---")
    compact_model = copy.deepcopy(model_with_zeros)

    # 1. Podar el módulo 'features'
    compact_model.features = _prune_sequential_module(compact_model.features)
    print("Módulo 'features' reconstruido.")

    # 2. Calcular el nuevo tamaño de entrada para el clasificador
    last_conv = next(m for m in reversed(compact_model.features) if isinstance(m, nn.Conv2d))
    last_conv_output_channels = last_conv.out_channels
    print(f"Nuevos canales de salida de 'features': {last_conv_output_channels}")

    # 3. Podar el módulo 'classifier'
    compact_model.classifier = _prune_sequential_module(
        compact_model.classifier,
        last_conv_output_channels=last_conv_output_channels
    )
    print("Módulo 'classifier' reconstruido y ajustado.")
    print("Modelo compacto creado.\n")
    return compact_model

In [None]:
# --------------------------------------------------------------------------
# FASE 1: Aplicar poda para generar ceros
# --------------------------------------------------------------------------
model_to_prune = copy.deepcopy(model)

# Tasa de poda uniforme para todas las capas. ¡Puedes cambiar este valor!
PRUNING_RATE = 0.20 # 20%

# --- Creación automática de la configuración de poda ---
layers_to_prune_config = {}
# Recorrer módulos 'features' y 'classifier' para encontrar capas podables
for module in [model_to_prune.features, model_to_prune.classifier]:
    for layer in module.children():
        if isinstance(layer, (nn.Conv2d, nn.Linear)):
            # No podar la última capa (salida de clasificación)
            if layer == model_to_prune.classifier[-1]:
                  layers_to_prune_config[layer] = 0.0
            else:
                  layers_to_prune_config[layer] = PRUNING_RATE

print(f"Configuración de poda creada para {len(layers_to_prune_config)} capas con una tasa del {PRUNING_RATE*100}%\n")

# Ejecutar la poda para poner pesos a cero
model_with_zeros = prune_network_structured(model_to_prune, layers_to_prune_config)

# --------------------------------------------------------------------------
# FASE 2: Reconstruir el modelo para eliminar los ceros físicamente
# --------------------------------------------------------------------------
compact_model = create_compact_model(model_with_zeros)

print("--- Modelo Compacto Final ---")
print(compact_model)
params_compact = sum(p.numel() for p in compact_model.parameters() if p.requires_grad)
print(f"Parámetros entrenables finales: {params_compact:,}")
params_original = sum(p.numel() for p in model.parameters() if p.requires_grad)
reduction = 100 * (1 - params_compact / params_original)
print(f"Reducción total de parámetros: {reduction:.2f}%\n")

## 6. Fine tuning final
Este paso es crítico para ajustar los pesos y mejorar el comportamiento del modelo ya que hemos eliminado un porcentaje de filtros que ahora no están y el rendimiento se va a ver resentido.

In [None]:
n_epochs = 1
print(f'** FT the pruned model **')
opt = torch.optim.Adam(compact_model.parameters(), lr=0.0001)
loss_fn = torch.nn.CrossEntropyLoss()
compact_model.train().to('cuda')
for epoch in range(n_epochs): # Entrenamos n epocas
    # Entrenamiento
    train_running_loss = 0.0
    train_running_correct = 0
    train_counter = 0
    time_start = time.time()
    for inputs, labels in train_data_loader: # Obtenemos todos los batch de entrenamiento y los usamos para entrenar
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        opt.zero_grad()

        outputs = compact_model(inputs)
        loss = loss_fn(outputs, labels)
        train_running_loss += loss.item()
        _, preds = torch.max(outputs.data, 1)
        train_running_correct += (preds == labels).sum().item()
        train_counter = train_counter + 1
        loss.backward()
        opt.step()

    epoch_loss = train_running_loss / counter
    epoch_acc = 100. * (train_running_correct / len(train_data_loader.dataset))
    time_end = time.time() - time_start
    print(f'** Summary for epoch {epoch}: '
		f'loss: {epoch_loss:#.3g}, acc: {epoch_acc:#.3g}]  '
		f'time: {time_end:.3f}s **')

# Test
test_correct = 0
with torch.no_grad():
    time_start = time.time()
    for inputs, labels in test_data_loader: # Obtenemos todos los batch de test y los usamos para test
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        outs_compact_model = compact_model(inputs)
        _, preds = torch.max(outs_compact_model.data, 1)
        test_correct += (preds == labels).sum().item()

    acc = 100. * (test_correct / len(test_data_loader.dataset))
    time_end = time.time() - time_start
    print(f'** Summary for compact_model: '
		f'acc: {acc:#.3g}]  '
		f'time: {time_end:.3f}s **')

## 7. Exportar el modelo

Una vez que hemos realizado el fine tuning del modelo compactado, exportamos el modelo a un fichero para poder usarlo en nuestra aplicación.

In [None]:
compact_model.eval()
torch.save(compact_model.state_dict(), '.compact_model.pt')

Además, vamos a realizar una  inferencia de prueba para analizar el rendimiento del modelo original y del compactado.

In [None]:
image = torch.Tensor(np.random.rand(1,3,224,244)).float().cuda()
model.eval().to('cuda')
compact_model.eval().to('cuda')

# Original model
times = []
for i in range(50):
    torch.cuda.synchronize()  # Sincroniza antes de empezar a medir
    time_start = time.time()
    model(image)
    torch.cuda.synchronize()  # Espera a que la ejecución en la GPU termine
    time_end = time.time() - time_start
    times.append(time_end)

time_end = np.mean(times)
print(f'Execution time of the original model: {time_end:.3f}s')

# Pruned model
times = []
for i in range(50):
    torch.cuda.synchronize()  # Sincroniza antes de empezar a medir
    time_start = time.time()
    compact_model(image)
    torch.cuda.synchronize()  # Espera a que la ejecución en la GPU termine
    time_end = time.time() - time_start
    times.append(time_end)

time_end = np.mean(times)
print(f'Execution time of the pruned model: {time_end:.3f}s')