In [6]:
import numpy
from datasets import load_dataset
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, roc_curve, auc, precision_score, recall_score
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torch.nn.functional as F
from PIL import Image
import wandb

Asignamos el dataset a la variable **dataset**

In [7]:
dataset = load_dataset("cats_vs_dogs")

Creamos un *DataFrame* llamado **mydataset**, el cual almacenará el path de cada imagen junto a su etiqueta (perro o gato). Además creamos un directorio llamado dataset y almacenamos allí las imágenes.


In [8]:
main_dir = './dataset'
os.makedirs(main_dir, exist_ok=True)

mydataset = pd.DataFrame(columns=['image_path', 'label'])

for i in range(len(dataset['train'])):
    img_path = f"{main_dir}/img_{i}.jpeg"

    if not os.path.exists(img_path):
        dataset['train'][i]['image'].save(img_path)

    mydataset.at[i, 'image_path'] = img_path
    mydataset.at[i, 'label'] = dataset['train'][i]['labels']

mydataset.head()

Unnamed: 0,image_path,label
0,./dataset/img_0.jpeg,0
1,./dataset/img_1.jpeg,0
2,./dataset/img_2.jpeg,0
3,./dataset/img_3.jpeg,0
4,./dataset/img_4.jpeg,0


Definimos la semilla para que al divir el dataset en trainn y test, sea la misma división de datos. Por otro lado, creamos un diccionario para almacenar los parámetros que usaremos. Además, especificamos la proporción de datos que serán para testeo y para validación.

> **Aclaración:** los datos de validación surgen de una proporción sobre los datos de testeo.



In [9]:
seed = 42
test_size = 0.15
val_size = 0.20

exp_config = dict()
exp_config['seed'] = seed
exp_config['test_size'] = test_size
exp_config['val_size'] = val_size

Dividimos el dataset en *train*, *test*, *val*.


In [10]:
train_val_df, test_df = train_test_split(mydataset, test_size=test_size, stratify=mydataset['label'], random_state=seed)

train_df, val_df = train_test_split(train_val_df, test_size=val_size, stratify=train_val_df['label'], random_state=seed)

Añadimos parámetros de configuración al diccionario.

In [11]:
exp_config['train_n_cats'] = train_df['label'].value_counts()[0]
exp_config['train_n_dogs'] = train_df['label'].value_counts()[1]
exp_config['val_n_cats'] = val_df['label'].value_counts()[0]
exp_config['val_n_dogs'] = val_df['label'].value_counts()[1]
exp_config['test_n_cats'] = test_df['label'].value_counts()[0]
exp_config['test_n_dogs'] = test_df['label'].value_counts()[1]

La clase CatsDogsDataset es una implementación personalizada de un conjunto de datos para un programa de visión por computadora que clasifica imágenes de gatos y perros

**Explicación**
1. Constructor (__init__):  
- img_path_list: Lista de rutas de las imágenes.
- lab_list: Lista de etiquetas correspondientes a las imágenes (0 para gatos, 1 para perros).
- transform: Transformaciones opcionales que se aplicarán a las imágenes (por ejemplo, redimensionar, normalizar).
2. Método __len__:  
- Devuelve la cantidad de imágenes en el conjunto de datos.
3. Método __getitem__:
- idx: Índice de la imagen y etiqueta que se desea obtener.
- img_path: Obtiene la ruta de la imagen en el índice idx.
- image: Abre la imagen y la convierte a formato RGB.
- label: Obtiene la etiqueta correspondiente a la imagen y la convierte a un tensor de PyTorch.
- Si se especificaron transformaciones, se aplican a la imagen.
- Devuelve la imagen transformada y su etiqueta correspondiente.

In [12]:
class CatsDogsDataset(Dataset):
    def __init__(self, img_path_list, lab_list, transform=None):
        self.transform = transform
        self.images = img_path_list
        self.labels = lab_list

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert("RGB")

        label = self.labels[idx]
        label = torch.Tensor([label])

        if self.transform:
            image = self.transform(image)

        return image, label

Definimos la resolución de las imágenes que serán procesadas.

In [13]:
input_size = (224,224)
exp_config['input_size'] = input_size

Como las imágenes son a color en formato RGB, definiremos 3 canales

In [14]:
n_channels = 3
exp_config['n_channels'] = n_channels

Creamos el *transformador* que será usado, el cual redimensiona las imágenes a la resolución dada.

In [15]:
transform = transforms.Compose([
    transforms.Resize(input_size),
    transforms.ToTensor(),
])

Creamos los datasets de
- Train
- Test
- Val

> **Aclaración:** son creados a partir de la clase CatsDogsDataset y usan como parámetro el *transformer* definido previamente.


In [16]:
train_dataset = CatsDogsDataset(train_df['image_path'].tolist(), train_df['label'].tolist(), transform)
val_dataset = CatsDogsDataset(val_df['image_path'].tolist(), val_df['label'].tolist(), transform)
test_dataset = CatsDogsDataset(test_df['image_path'].tolist(), test_df['label'].tolist(), transform)

Creamos los *dataloaders* de train, val y test y definimos el tamaño de lote.

> **Aclaración:** el batch size de test es 1 y además los datos no serán mezclados por cada épocas y no se eliminarán datos para alcanzar el tamaño de lote establecido.

In [17]:
from torch.utils.data import DataLoader

batch_size = 64
exp_config['batch_size'] = batch_size

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, drop_last=False)

## Modelos

Definimos 2 modelos, uno hecho *from scratch* y otro en base a *ResNet18*.


In [18]:
#Definición de mi red
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # Capa convolucional
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)        # Max pooling
        self.relu = nn.ReLU()                                    # Activación ReLu no lineal
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1) # Otra capa convolucional
        self.flatten = nn.Flatten()                              # Capa flatten
        self.fc1 = nn.Linear(32 * 56 * 56, 64)                   # Capa completamente conectada (el calculo del primer parámetro depende de dim entrada y capas previas)
        self.fc2 = nn.Linear(64, 1)                              # Capa de salida para clasificación binaria (1 neurona + Act. Sigmoide)
        self.sigmoid = nn.Sigmoid()                              # Activación sigmoide

    def forward(self, x):
      x = self.conv1(x)     #(E: Nx3x224x224 -> S:NX16x224x224)
      x = self.relu(x)      #
      x = self.pool(x)      # (->S: Nx16x112x112)

      x = self.conv2(x)     # (->S: Nx32x112x112)
      x = self.relu(x)      #
      x = self.pool(x)      #  (->S: Nx32x56x56)

      x = self.flatten(x)   # -> S: 32×56×56=100352
      x = self.fc1(x)       # -> S: 64
      x = self.relu(x)      #
      x = self.fc2(x)       # -> S: 1
      x = torch.sigmoid(x)  #

      return x

In [19]:
class ResNet18(nn.Module):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.base_model = models.resnet18(pretrained=True)  # Cargar ResNet18 preentrenada
        self.base_model.fc = nn.Linear(self.base_model.fc.in_features, 1)  # Modificar la capa de salida

    def forward(self, x):
        x = self.base_model(x)
        x = torch.sigmoid (x) # Activación sigmoide en la salida (otra manera de aplicarla)

        return x

## Entrenamiento


In [20]:
if os.name == 'posix':
    device = torch.device("mps" if torch.mps.is_available() else "cpu")
elif os.name == 'nt':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
    device = torch.device("cpu")
    
device

device(type='mps')

## Elección de modelo, función de costo y optimizador.

In [21]:
model = SimpleCNN().to(device)
exp_config['model'] = 'SimpleCNN'
criterion = nn.BCELoss() #BinaryCrossEntropy o Entropía Cruzada Binaria
exp_config['model'] = 'BCELoss'
lr = 0.001
optimizer = optim.Adam(model.parameters(), lr=lr) #Importante pasarle los parámetros del modelo al optimizador !
exp_config['optimizador'] = 'Adam'
exp_config['learning_rate'] = lr

Parámetros seleccionados hasta el momento.

In [22]:
exp_config

{'seed': 42,
 'test_size': 0.15,
 'val_size': 0.2,
 'train_n_cats': 7984,
 'train_n_dogs': 7934,
 'val_n_cats': 1996,
 'val_n_dogs': 1984,
 'test_n_cats': 1761,
 'test_n_dogs': 1751,
 'input_size': (224, 224),
 'n_channels': 3,
 'batch_size': 64,
 'model': 'BCELoss',
 'optimizador': 'Adam',
 'learning_rate': 0.001}

## Función de entrenamiento y validación.

In [32]:
def train(model, train_dataloader, criterion, optimizer, device):

    model.to(device) #Enviar el modelo al dispositivo
    model.train()  # Configurar el modelo en modo de entrenamiento
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_dataloader:
        images, labels = images.to(device), labels.to(device)  # Enviar los datos al dispositivo

        optimizer.zero_grad()  # Reiniciar los gradientes
        outputs = model(images)  # Forward pass

        loss = criterion(outputs, labels)  # Calcular la pérdida
        loss.backward()  # Backward pass

        optimizer.step()  # Actualizar parámetros

        running_loss += loss.item()

        # Calcular exactitud
        threshold = 0.5  # Umbral para clasificar
        predicted = (outputs.detach() >= threshold)  # 1 si >= umbral, 0 si < umbral
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_avg_loss = running_loss / len(train_dataloader)
    train_accuracy = correct / total
    
    return train_avg_loss, train_accuracy

def validate(model, val_dataloader, criterion, device):

    model.eval()  # Configurar el modelo en modo de evaluación

    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():  # Desactivar gradientes
        for images, labels in val_dataloader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)

            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Calcular precisión
            threshold = 0.5  # Umbral para clasificar
            predicted = (outputs.detach() >= threshold)  # 1 si >= umbral, 0 si < umbral
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_avg_loss = running_loss / len(val_dataloader)
    val_accuracy = correct / total
    
    return val_avg_loss, val_accuracy


## WandB

In [33]:
wandb.login(key="d567fa512c6502cc7986d8c90fd37c4f0969de0d")

[34m[1mwandb[0m: Currently logged in as: [33mintart-estudiantes[0m ([33mar-um[0m). Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /Users/francobertoldi/.netrc


True

In [34]:
#wandb.init(project="CNN_CatsvsDogs", entity="ar-um", tags = ["BERTOLDI_MANCUSO"])

In [35]:
#wandb.config.update(exp_config)

## Ajuste del modelo

In [36]:
num_epochs = 15
early_stopping_patience = 5
epochs_without_improvement = 0

exp_config['num_epochs'] = num_epochs
exp_config['early_stopping_patience'] = early_stopping_patience

checkpoint_path = './best_model.pth'

best_val_loss = float('inf')  # Inicializa con infinito positivo

for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_dataloader, criterion, optimizer, device)
    val_loss, val_accuracy = validate(model, val_dataloader, criterion, device)

    print(f'Epoch [{epoch + 1}/{num_epochs}], '
          f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}, '
          f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}')

    #Registro de metricas en WandB
    # wandb.log({"epochs": epoch,
    #           "train_acc": train_accuracy,
    #            "train_loss": train_loss,
    #            "val_acc": val_accuracy,
    #            "val_loss": val_loss})

    #Checkpoint
    if val_loss < best_val_loss:
      best_val_loss = val_loss #actualizo el valor de la mejor (menor) loss
      torch.save(model.state_dict(), checkpoint_path) #almaceno el mejor modelo
      epochs_without_improvement = 0 #reinicio este contador
      print("Checkpoint saved")

    #Early Stopping
    else:
      epochs_without_improvement +=1
      if epochs_without_improvement == early_stopping_patience:
        print("Early Stopping")
        break #Interrumpo el entrenamiento

#wandb.finish() No lo vamos a frenar acá para poder registrar los resultados en test

Epoch [1/15], Train Loss: 0.4095, Train Accuracy: 0.81, Validation Loss: 0.3217, Validation Accuracy: 0.87
Checkpoint saved
Epoch [2/15], Train Loss: 0.3205, Train Accuracy: 0.86, Validation Loss: 0.2375, Validation Accuracy: 0.90
Checkpoint saved
Epoch [3/15], Train Loss: 0.2226, Train Accuracy: 0.91, Validation Loss: 0.1517, Validation Accuracy: 0.95
Checkpoint saved
Epoch [4/15], Train Loss: 0.1297, Train Accuracy: 0.95, Validation Loss: 0.0695, Validation Accuracy: 0.98
Checkpoint saved
Epoch [5/15], Train Loss: 0.0626, Train Accuracy: 0.98, Validation Loss: 0.0337, Validation Accuracy: 0.99
Checkpoint saved
Epoch [6/15], Train Loss: 0.0366, Train Accuracy: 0.99, Validation Loss: 0.0264, Validation Accuracy: 0.99
Checkpoint saved
Epoch [7/15], Train Loss: 0.0196, Train Accuracy: 1.00, Validation Loss: 0.0227, Validation Accuracy: 1.00
Checkpoint saved
Epoch [8/15], Train Loss: 0.0214, Train Accuracy: 0.99, Validation Loss: 0.0110, Validation Accuracy: 1.00
Checkpoint saved
Epoch [9

## Testeo

In [37]:
#model = SimpleCNN()  # Reemplaza con la clase de tu modelo
#model = ResNet18()
model #si se hace todo en una sola corrida, basta con usar el modelo que ya esta instanciado y cargar los pesos

#Cargar los pesos del checkpoint
model.load_state_dict(torch.load(checkpoint_path))
model.to(device)

# Establece el modelo en modo evaluación
model.eval()

  model.load_state_dict(torch.load(checkpoint_path))


SimpleCNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (relu): ReLU()
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=100352, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [69]:
import torch
import numpy as np

y_true = []
y_proba = []

# Iterar sobre el conjunto de prueba
for image, label in test_dataloader:
    image, label = image.to(device), label.to(device)  # Enviar los datos al dispositivo

    with torch.no_grad():  # Desactivar el cálculo de gradientes
        output = model(image)  # Realizar la inferencia

        y_true.append(label.to("cpu").float())  # Guardar los labels como tensores de PyTorch
        y_proba.append(output.to("cpu").float())  # Guardar las probabilidades como tensores de PyTorch


In [70]:
y_true

[tensor([[1.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[0.]]),
 tensor([[1.]]),
 tensor([[1.]]),
 tensor([[0.]]

In [71]:
# Definimos el umbral arbitrario
thr = 0.5

# Convertir las listas de tensores a un solo tensor
y_true_tensor = torch.cat(y_true)  # Concatenar todos los tensores de labels
y_proba_tensor = torch.cat(y_proba)  # Concatenar todos los tensores de probabilidades

# Aplicar el umbral y obtener las predicciones binarias
y_pred_tensor = (y_proba_tensor >= thr).int()

# Convertir a NumPy arrays al final
y_true = y_true_tensor.numpy()
y_pred = y_pred_tensor.numpy()

# Ahora y_pred debería tener la misma forma que y_true
print(y_true.shape, y_pred.shape)

(3512, 1) (3512, 1)


In [74]:
from sklearn.metrics import accuracy_score, confusion_matrix, roc_curve, auc, precision_score, recall_score

# Calcular métricas de clasificación
accuracy = accuracy_score(y_true, y_pred)
conf_matrix = confusion_matrix(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
specificity = recall_score(y_true, y_pred, pos_label=0)

# Calcular la curva ROC
# fpr, tpr, _ = roc_curve(y_true, y_proba)  # y_pred_proba es la probabilidad predicha
# roc_auc = auc(fpr, tpr)

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"Specificity: {specificity:.2f}")
print(f"Confusion Matrix:\n{conf_matrix}")

Accuracy: 0.75
Precision: 0.75
Recall: 0.74
Specificity: 0.75
Confusion Matrix:
[[1317  444]
 [ 451 1300]]
