In [1]:
# Importación de bibliotecas necesarias if you want to download the dataset from kaggle
#import kagglehub
#import shutil

# Descarga del conjunto de datos de Kaggle
# Este conjunto contiene imágenes de tomografías computarizadas del cerebro
# con casos de ictus y casos normales
#path = kagglehub.dataset_download("afridirahman/brain-stroke-ct-image-dataset")
#print("Ruta a los archivos del dataset:", path)

# Copia de los datos descargados a una ubicación local
# Esto nos permite trabajar con los datos sin necesidad de volver a descargarlos
#src = path  # Usamos la variable path que contiene la ubicación de descarga
#dst = r'.\Dataset_kaggle'

#shutil.copytree(src, dst)
#print(f"Dataset copiado de {src} a {dst}")

Ruta a los archivos del dataset: C:\Users\Usuario\.cache\kagglehub\datasets\afridirahman\brain-stroke-ct-image-dataset\versions\1
Dataset copiado de C:\Users\Usuario\.cache\kagglehub\datasets\afridirahman\brain-stroke-ct-image-dataset\versions\1 a .\Dataset_kaggle


In [2]:
# Importación de bibliotecas necesarias para el procesamiento de datos
import os
import shutil
import random
from pathlib import Path

# Configuración de directorios y parámetros
source_dir = '../../data/Brain_Data_Organised'  # Directorio fuente con las imágenes originales
target_dir = 'Dataset_img_for_CNN'  # Directorio donde organizaremos los datos para el entrenamiento
split_ratio = 0.8  # Proporción 80% para entrenamiento, 20% para prueba
classes = ['Stroke', 'Normal']  # Las dos clases que queremos clasificar

# Creación de la estructura de directorios para el entrenamiento
print("📁 Creando directorios de destino...")
for phase in ['train', 'test']:
    for cls in classes:
        os.makedirs(os.path.join(target_dir, phase, cls), exist_ok=True)

# Configuración de la semilla aleatoria para reproducibilidad
random.seed(42)  # Esto asegura que obtengamos los mismos resultados cada vez que ejecutemos el código

# Procesamiento de cada clase
for cls in classes:
    cls_dir = os.path.join(source_dir, cls)
    
    print(f"\n🔄 Procesando clase: {cls}")
    print(f"   Buscando archivos en: {cls_dir}")
    
    # Verificación de la existencia del directorio
    if not os.path.exists(cls_dir):
        print(f"❌ ¡Carpeta {cls_dir} no encontrada!")
        continue
    
    # Búsqueda de imágenes en formatos comunes
    image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff', '*.tif']
    images = []
    
    # Recopilación de todas las imágenes disponibles
    for ext in image_extensions:
        images.extend(list(Path(cls_dir).glob(ext)))
        images.extend(list(Path(cls_dir).glob(ext.upper())))  # También en mayúsculas
    
    print(f"   Imágenes encontradas: {len(images)}")
    
    if len(images) == 0:
        print(f"⚠️  No hay imágenes en la carpeta {cls_dir}")
        continue
    
    # División aleatoria de las imágenes
    random.shuffle(images)
    split_idx = int(len(images) * split_ratio)
    train_images = images[:split_idx]
    test_images = images[split_idx:]
    
    print(f"   División: {len(train_images)} para entrenamiento, {len(test_images)} para prueba")
    
    # Copia de archivos a sus respectivos directorios
    for i, img_path in enumerate(train_images):
        try:
            dest_path = os.path.join(target_dir, 'train', cls, img_path.name)
            shutil.copy2(img_path, dest_path)
            if i == 0:  # Imprimir el primer archivo para verificación
                print(f"   ✅ Primer archivo de entrenamiento: {img_path.name}")
        except Exception as e:
            print(f"   ❌ Error al copiar {img_path.name}: {e}")
    
    for i, img_path in enumerate(test_images):
        try:
            dest_path = os.path.join(target_dir, 'test', cls, img_path.name)
            shutil.copy2(img_path, dest_path)
            if i == 0:  # Imprimir el primer archivo para verificación
                print(f"   ✅ Primer archivo de prueba: {img_path.name}")
        except Exception as e:
            print(f"   ❌ Error al copiar {img_path.name}: {e}")

# Verificación de resultados
print("\n" + "="*50)
print("📊 Estadísticas finales:")
for phase in ['train', 'test']:
    for cls in classes:
        path = os.path.join(target_dir, phase, cls)
        if os.path.exists(path):
            count = len(os.listdir(path))
            print(f"   {phase}/{cls}: {count} archivos")

print("\n✅ ¡Distribución completada!")

📁 Creando directorios de destino...

🔄 Procesando clase: Stroke
   Buscando archivos en: Dataset_kaggle/Brain_Data_Organised\Stroke
   Imágenes encontradas: 1900
   División: 1520 para entrenamiento, 380 para prueba
   ✅ Primer archivo de entrenamiento: 67 (14).jpg
   ✅ Primer archivo de prueba: 75 (24).jpg

🔄 Procesando clase: Normal
   Buscando archivos en: Dataset_kaggle/Brain_Data_Organised\Normal
   Imágenes encontradas: 3102
   División: 2481 para entrenamiento, 621 para prueba
   ✅ Primer archivo de entrenamiento: 125 (11).jpg
   ✅ Primer archivo de prueba: 113 (3).jpg

📊 Estadísticas finales:
   train/Stroke: 913 archivos
   train/Normal: 1498 archivos
   test/Stroke: 343 archivos
   test/Normal: 568 archivos

✅ ¡Distribución completada!


In [3]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import classification_report
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import mlflow
import mlflow.pytorch

## 2. Аumentación

La aumentación de datos es un proceso muy potente que permite aumentar la cantidad de datos de entrenamiento. Mediante rotaciones, reflejos, adición de ruido, desplazamientos y otras transformaciones, la imagen cambia ligeramente, pero mantiene su etiqueta original. Con la función Compose podemos combinar varias transformaciones de imagen y luego aplicarlas al leer el conjunto de datos. La lista completa de aumentaciones está disponible [aquí](https://pytorch.org/vision/stable/transforms.html). Estúdiala y experimenta con diferentes transformaciones de imagen.

Una herramienta bastante potente y eficiente para la aumentación de imágenes es la biblioteca `albumentations`.

In [4]:
# Transformaciones para el conjunto de entrenamiento
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Redimensionar todas las imágenes a 224x224 píxeles
    transforms.Grayscale(num_output_channels=3),  # Convertir a escala de grises pero mantener 3 canales
    transforms.RandomHorizontalFlip(),  # Volteo horizontal aleatorio para aumentar la variedad
    transforms.RandomRotation(5),  # Rotación aleatoria de hasta 5 grados
    transforms.ColorJitter(brightness=0.1, contrast=0.1),  # Variación aleatoria de brillo y contraste
    transforms.ToTensor(),  # Convertir imagen a tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalización de los valores de píxeles
])

# Transformaciones para el conjunto de validación y prueba
val_test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Mismo tamaño que el conjunto de entrenamiento
    transforms.Grayscale(num_output_channels=3),  # Misma conversión a escala de grises
    transforms.ToTensor(),  # Convertir a tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Misma normalización
])

# Load the full training dataset
train_dataset = datasets.ImageFolder(
    root='Dataset_img_for_CNN/train',
    transform=train_transforms
)

test_dataset = datasets.ImageFolder(
    root='Dataset_img_for_CNN/test',
    transform=val_test_transforms
)

# Split training data into train and validation sets (80-20 split)
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size

# Use random_split to create train and validation datasets
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create validation dataset with appropriate transforms
val_dataset.dataset = datasets.ImageFolder(
    root='Dataset_img_for_CNN/train',
    transform=val_test_transforms
)
val_dataset = torch.utils.data.Subset(val_dataset.dataset, val_dataset.indices)

# Create data loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=4
)

test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=4
)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")


Training samples: 1928
Validation samples: 483
Test samples: 911



## 3. Regularización y normalización en redes neuronales

### Dropout
Si la red tiene una arquitectura compleja, es posible el sobreajuste (overfitting) - un proceso en el que el modelo se adapta demasiado a los datos de entrenamiento y luego da un rendimiento inferior en los datos de prueba. Para combatir esto, se puede utilizar Dropout. La idea del método es muy simple. Durante el entrenamiento, `torch.nn.Dropout` establece a cero cada elemento del tensor de entrada con una probabilidad $p$. Durante la inferencia, no se establece nada a cero, pero para mantener la escala de las salidas de la red, todos los elementos del tensor de entrada se dividen por $1 - p$.

![Dropout](https://github.com/hse-ds/iad-deep-learning/blob/master/2022/seminars/sem03/static/dropout.png?raw=1)

Para estabilizar y acelerar la convergencia del entrenamiento, se utiliza frecuentemente la normalización por lotes (batch normalization). En **PyTorch** también está implementada como una capa — [`torch.nn.BatchNorm2d`](https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html). Generalmente, la normalización por lotes se inserta entre los bloques significativos de la red neuronal para mantener la distribución de los datos durante todo el forward pass. Tenga en cuenta que durante el entrenamiento, la media y la desviación estándar muestral se calculan de nuevo para cada lote, y la capa tiene dos parámetros numéricos entrenables para cada canal del tensor de entrada. Durante la inferencia, se utilizan como media y varianza las estimaciones obtenidas mediante promedios móviles durante el entrenamiento.

![Batch Norm](https://github.com/hse-ds/iad-deep-learning/blob/master/2022/seminars/sem03/static/batch_norm.png?raw=1)


![Typical CNN architecture](Typical%20CNN%20architecture.png)


In [5]:
# Definición de la arquitectura CNN
model = nn.Sequential(
    # Primera capa convolucional
    nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),  # 3 canales de entrada, 32 filtros
    nn.BatchNorm2d(32),  # Normalización por lotes
    nn.ReLU(),  # Función de activación
    nn.MaxPool2d(2, 2),  # Reducción de dimensionalidad

    # Segunda capa convolucional
    nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),  # 32 canales de entrada, 64 filtros
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),

    # Tercera capa convolucional
    nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),  # 64 canales de entrada, 128 filtros
    nn.BatchNorm2d(128),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),

    # Capas fully connected
    nn.Flatten(),  # Aplanar la salida para las capas densas
    nn.Dropout(0.1),  # Regularización para evitar overfitting
    nn.Linear(128*28*28, 512),  # Primera capa densa
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.Linear(512, 2)  # Capa de salida (2 clases)
)

# Set device and move model to device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
print(f"Using device: {device}")

# Define optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

Using device: cpu


### Class torch.nn.Conv2d

 
En **PyTorch**, la capa convolucional está representada en el módulo `torch.nn` por la clase [`Conv2d`](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html) con los siguientes parámetros:
- `in_channels`: número de canales de entrada
- `out_channels`: número de canales de salida
- `kernel_size`: tamaño del kernel (núcleo)
- `stride`: paso (desplazamiento)
- `padding`: relleno
- `padding_mode`: modo de relleno (`'zeros'`, `'reflect'` y otros)
- `dilation`: dilatación

#### `kernel_size`

**Tamaño del kernel (núcleo)**. `int`, si el kernel es cuadrado, y una tupla de dos números si el kernel es rectangular. Define el tamaño del filtro con el que se realiza la convolución de la imagen.

**`kernel_size=3`**

![no_padding_no_strides.gif](static/no_padding_no_strides.gif)

Esta y las siguientes animaciones están tomadas de [aquí](https://github.com/vdumoulin/conv_arithmetic).

#### `stride`

**Paso (stride)**. Define el paso, en píxeles, con el que se desplaza el filtro. `int`, si el desplazamiento es el mismo en horizontal y vertical. Una tupla de dos números, si los desplazamientos son diferentes.

**`stride=2`**

![no_padding_strides.gif](static/no_padding_strides.gif)

#### `padding`

**Relleno (padding)**. Cantidad de píxeles con los que se complementa la imagen. Similar al paso y al tamaño del kernel, puede ser tanto `int` como una tupla de dos números.

**`padding=1`**

![same_padding_no_strides.gif](static/same_padding_no_strides.gif)

In [None]:
# Configurar MLflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("CNN_Stroke_Detection")

# Entrenamiento con MLflow
with mlflow.start_run() as run:
    # Registrar hiperparámetros
    mlflow.log_params({
        "learning_rate": 0.0001,
        "batch_size": 32,
        "epochs": 20,
        "optimizer": "Adam",
        "model_type": "CNN"
    })

    # Bucle de entrenamiento
    for epoch in range(20):
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0

        print(f"\nEpoch [{epoch+1}/20]")
        
        # Training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()

        train_loss = train_loss / len(train_loader)
        train_acc = 100. * train_correct / train_total

        # Validation loop
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_acc = 100. * val_correct / val_total

        # Registrar métricas en MLflow
        mlflow.log_metrics({
            "train_loss": train_loss,
            "train_accuracy": train_acc,
            "val_loss": val_loss,
            "val_accuracy": val_acc
        }, step=epoch)

        print(f"  Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%")
        print(f"  Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.2f}%")

        # Guardar el modelo en cada época
        mlflow.pytorch.log_model(model, f"model_epoch_{epoch+1}")

print('Training completed!')
print("=" * 70)



Epoch [1/20]




  Train Loss: 0.9939, Train Accuracy: 63.43%
  Val Loss: 0.5962, Val Accuracy: 70.39%





Epoch [2/20]




  Train Loss: 0.5424, Train Accuracy: 71.32%
  Val Loss: 0.5449, Val Accuracy: 71.64%





Epoch [3/20]




  Train Loss: 0.4784, Train Accuracy: 75.47%
  Val Loss: 0.4926, Val Accuracy: 75.57%





Epoch [4/20]




  Train Loss: 0.4284, Train Accuracy: 79.41%
  Val Loss: 0.4379, Val Accuracy: 79.50%





Epoch [5/20]




  Train Loss: 0.3922, Train Accuracy: 80.65%
  Val Loss: 0.4388, Val Accuracy: 78.47%





Epoch [6/20]




  Train Loss: 0.3373, Train Accuracy: 85.01%
  Val Loss: 0.3963, Val Accuracy: 80.75%





Epoch [7/20]




  Train Loss: 0.3196, Train Accuracy: 85.11%
  Val Loss: 0.3898, Val Accuracy: 85.30%





Epoch [8/20]




  Train Loss: 0.2695, Train Accuracy: 88.80%
  Val Loss: 0.3307, Val Accuracy: 85.71%





Epoch [9/20]




  Train Loss: 0.2541, Train Accuracy: 89.78%
  Val Loss: 0.3884, Val Accuracy: 82.40%





Epoch [10/20]




  Train Loss: 0.2173, Train Accuracy: 90.35%
  Val Loss: 0.2429, Val Accuracy: 90.89%





Epoch [11/20]




  Train Loss: 0.2136, Train Accuracy: 91.39%
  Val Loss: 0.2706, Val Accuracy: 90.06%





Epoch [12/20]




  Train Loss: 0.2078, Train Accuracy: 91.65%
  Val Loss: 0.2841, Val Accuracy: 87.37%





Epoch [13/20]




  Train Loss: 0.1624, Train Accuracy: 93.57%
  Val Loss: 0.3487, Val Accuracy: 84.27%





Epoch [14/20]




  Train Loss: 0.1438, Train Accuracy: 94.09%
  Val Loss: 0.2235, Val Accuracy: 91.51%





Epoch [15/20]




  Train Loss: 0.1319, Train Accuracy: 95.23%
  Val Loss: 0.2068, Val Accuracy: 92.96%





Epoch [16/20]




  Train Loss: 0.1154, Train Accuracy: 96.01%
  Val Loss: 0.3614, Val Accuracy: 83.64%





Epoch [17/20]




  Train Loss: 0.1009, Train Accuracy: 96.84%
  Val Loss: 0.2068, Val Accuracy: 92.13%





Epoch [18/20]




  Train Loss: 0.0963, Train Accuracy: 96.84%
  Val Loss: 0.2980, Val Accuracy: 88.20%





Epoch [19/20]




  Train Loss: 0.1055, Train Accuracy: 96.32%
  Val Loss: 0.2021, Val Accuracy: 93.37%


In [None]:
# Final evaluation on test set
print("Evaluating on test set...")
model.eval()

# Collect all predictions and true labels for classification report
all_predictions = []
all_labels = []
correct_test = 0
total_test = 0
test_loss = 0.0

with torch.no_grad():
    for images, labels in test_loader:
        # Move data to device
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Statistics
        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total_test += labels.size(0)
        correct_test += (predicted == labels).sum().item()

        # Collect predictions and labels for classification report
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate final test metrics
test_accuracy = 100 * correct_test / total_test
avg_test_loss = test_loss / len(test_loader)

print(f"\nFINAL TEST RESULTS:")
print(f"Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.2f}%")
print("=" * 70)

# Generate classification report
class_names = test_dataset.classes
print("\nDETAILED CLASSIFICATION REPORT:")
print(classification_report(all_labels, all_predictions,
                          target_names=class_names,
                          digits=4))

# Print training summary
print("\nTRAINING SUMMARY:")
print(f"Best validation accuracy: {max(val_accuracies):.2f}% (Epoch {val_accuracies.index(max(val_accuracies))+1})")
print(f"Final validation accuracy: {val_accuracies[-1]:.2f}%")
print(f"Final test accuracy: {test_accuracy:.2f}%")
print("=" * 70)

# Registrar métricas finales en MLflow
mlflow.log_metrics({
    'test_loss': avg_test_loss,
    'test_accuracy': test_accuracy
})

# Guardar el classification report como texto
report = classification_report(all_labels, all_predictions,
                          target_names=class_names,
                          digits=4)
mlflow.log_text(report, 'classification_report.txt')

In [None]:

# Save the final trained model
import os

# Create directory for saving models
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

# Save the complete final model
model_path = f"{save_dir}/maryna_cnn_model.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'final_train_accuracy': train_accuracies[-1],
    'final_val_accuracy': val_accuracies[-1],
    'test_accuracy': test_accuracy,
    'num_epochs': num_epochs,
    'model_architecture': str(model),
    'class_names': class_names,
    'training_history': {
        'train_losses': train_losses,
        'train_accuracies': train_accuracies,
        'val_losses': val_losses,
        'val_accuracies': val_accuracies
    }
}, model_path)

print(f"Model saved to: {model_path}")

print("\n" + "="*50)
print("MODEL LOADING INSTRUCTIONS:")
print("="*50)
print("To load the model:")
print("checkpoint = torch.load('saved_models/maryna_cnn_model.pth')")
print("model.load_state_dict(checkpoint['model_state_dict'])")
print("test_accuracy = checkpoint['test_accuracy']")
print("="*50)

In [None]:
# Code to load the saved model

import torch
import torch.nn as nn

# Recreate the model architecture (must be identical to training)
model = nn.Sequential(
    nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),

    nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),

    nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
    nn.BatchNorm2d(128),
    nn.ReLU(),
    nn.MaxPool2d(2, 2),

    nn.Flatten(),
    nn.Dropout(0.1),
    nn.Linear(128*28*28, 512),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.Linear(512, 2)
)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the saved checkpoint
checkpoint = torch.load('model_maryna.pth', map_location=device)

# Load the model weights
model.load_state_dict(checkpoint['model_state_dict'])

# Move model to device
model = model.to(device)

# Set model to evaluation mode
model.eval()

# Access saved information
test_accuracy = checkpoint['test_accuracy']
class_names = checkpoint['class_names']
training_history = checkpoint['training_history']

print(f"Model loaded successfully!")
print(f"Test Accuracy: {test_accuracy:.2f}%")
print(f"Classes: {class_names}")
print(f"Training completed in {checkpoint['num_epochs']} epochs")