Importar las librerias necesarias

In [44]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from torchvision.transforms import transforms
import torchvision
import matplotlib.pyplot as plt
import os
import pandas as pd
from torchvision.io import read_image
from pathlib import Path

Determinar si se usa CUDA o CPU

In [2]:
if torch.cuda.is_available(): 
 print("cuda") 
else: 
 print("cpu") 

cuda


Definir la ubicación del dataset

In [60]:
#file_path = pathlib.Path(__file__).parent.absolute() # Para .py
file_path = Path.cwd() # Para .ipnyb
dataset_path = base_folder = pathlib.Path(file_path) / "data/Dataset"

Definir las transformaciones que se realizan en entrenamiento, validación y en ambas.

In [46]:
def get_transforms(split):
    global_transforms = [
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Grayscale(),
        torchvision.transforms.Resize((48, 48))
    ]
    
    mean, std = 0.5, 0.5

    train_transforms = transforms.Compose([
        *global_transforms,
        #Data augmentation
        torchvision.transforms.Normalize((mean,), (std,))
    ])

    test_transforms = transforms.Compose([
        *global_transforms,
        torchvision.transforms.Normalize((mean,), (std,))
    ])

    if split == "train":
        return train_transforms
    else:
        return test_transforms





Crear la clase Dataset

In [47]:
class EyeDataset(Dataset):
    def __init__(self, root_dir, split = "train"):
        self.root_dir = root_dir
        self.split = split
        self.transform = get_transforms(self.split)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir)
        image = read_image(img_path)
        image_name = image_path.split('/')[-1]
        label = image_name[16]
        if self.transform:
            image = self.transform(image)
        return (image, label)

    def __len__(self):
        return len(os.listdir(self.root_dir))

Crear el dataloader

In [48]:
def get_loader(batch_size, shuffle=True, num_workers=0):

    train_dataset = EyeDataset(root_dir = dataset_path, split = "train")
    val_dataset = EyeDataset(root_dir = dataset_path, split = "val")

    total_len = len(train_dataset)
    
    indices = torch.randperm(len(train_dataset)) # Sortea los indices
    val_size = total_len//3 # Un tercio del total aplicado piso
    train_dataset = torch.utils.data.Subset(train_dataset, indices[:-val_size]) # Selecciona todos los indices menos los ultimos val_size indices
    val_dataset = torch.utils.data.Subset(val_dataset, indices[-val_size:]) # Selecciona solo los ultimos val_size indices

    train_dataloader = DataLoader(
                train_dataset,
                batch_size=batch_size,
                shuffle=shuffle,
                num_workers=num_workers,
        )

    val_dataloader = DataLoader(
                val_dataset,
                batch_size=batch_size,
                shuffle=shuffle,
                num_workers=num_workers,
        )


    return train_dataset, train_dataloader, val_dataset, val_dataloader

Creación de la red

In [49]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import pathlib

In [50]:
class Network(nn.Module):
    def __init__(self, input_dim: int, n_classes: int) -> None:
        super().__init__()
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        out_dim =  self.calc_out_dim(input_dim,5, padding=2)

        self.conv1 = nn.Conv2d(1,16,kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(16,32,kernel_size=5)
        self.lineal1 = nn.Linear(32*self.calc_out_dim(out_dim,5)*self.calc_out_dim(out_dim,5),1024)
        self.lineal2 = nn.Linear(1024,n_classes)
        
        self.to(self.device)
 
    def calc_out_dim(self, in_dim, kernel_size, stride=1, padding=0):
        out_dim = math.floor((in_dim - kernel_size + 2*padding)/stride) + 1
        return out_dim

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if torch.cuda.is_available():
            x = x.cuda()
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = torch.flatten(x, start_dim=1)
        x = self.lineal1(x)
        x = F.relu(x)
        logits = self.lineal2(x)
        proba = F.softmax(logits, dim=1)

        return logits, proba

    def predict(self, x):
        with torch.inference_mode():
            return self.forward(x)

    def save_model(self, model_name: str):
        models_path = file_path / 'models' / model_name
        torch.save(self.state_dict(),models_path)

    def load_model(self, model_name: str):
        self.load_state_dict(torch.load(file_path / 'models' / model_name))


Entrenamiento

In [51]:
import matplotlib.pyplot as plt
import numpy as np
import os
import cv2
import torch.optim as optim
from tqdm import tqdm
from network import Network

In [55]:
import datetime as dt
import pathlib
import matplotlib
import matplotlib.pyplot as plt

file_path = Path.cwd()

class PlotLosses():
    def __init__(self, logs={}):
        self.i = 0
        self.x = []
        self.losses = []
        self.val_losses = []
        self.fig = plt.figure()
        
        self.logs = []

    def on_epoch_end(self, epoch, train_loss, val_loss):        
        self.x.append(self.i)
        self.losses.append(train_loss)
        self.val_losses.append(val_loss)
        self.i += 1
        plt.cla()
        plt.plot(self.x, self.losses, label="Costo de entrenamiento promedio")
        plt.plot(self.x, self.val_losses, label="Costo de validación promedio")
        plt.xlabel('epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.show(block=False)
        plt.pause(5)

    def on_train_end(self):
        plt.show()
        today = dt.datetime.now().strftime("%Y-%m-%d")
        losses_file = file_path/ f'figures/losses_{today}.png'
        plt.savefig(losses_file)

In [56]:
def validation_step(val_loader, net, cost_function):
    val_loss = 0.0
    correct = 0
    total = 0
    for i, batch in enumerate(val_loader, 0):
        batch_imgs = batch['transformed']
        batch_labels = batch['label']
        if torch.cuda.is_available():
            batch_labels = batch_labels.cuda()
        with torch.inference_mode():
            outputs,proba = net(batch_imgs)
            loss = cost_function(outputs, batch_labels)
            val_loss += loss.item()

            predictions = torch.argmax(proba,dim=1)
            total += batch_labels.size(0)
            cor = (predictions == batch_labels).sum().item()
            correct += cor

    accuracy = 100 * float(correct) / total
    print(f"Accuracy: {accuracy}")
    return val_loss/len(val_loader)

def train():
    learning_rate = 1e-5
    n_epochs= 50
    batch_size = 32

    train_dataset, train_loader, _, _ = \
        get_loader(batch_size=batch_size,
                    shuffle=True)
    _, _, val_dataset, val_loader = \
        get_loader(batch_size=batch_size,
                    shuffle=False)

    print(f"Cargando datasets --> entrenamiento: {len(train_dataset)}, validacion: {len(val_dataset)}")

    plotter = PlotLosses()
    modelo = Network(input_dim = 48,
                     n_classes = 7)

    cost_function = nn.CrossEntropyLoss()

    optimizer = optim.Adam(modelo.parameters(), learning_rate, weight_decay=0.01)
    best_epoch_loss = np.inf
    best_epoch_loss_train = np.inf
    for epoch in range(n_epochs):
        train_loss = 0
        for i, batch in enumerate(tqdm(train_loader, desc=f"Epoch: {epoch}")):
            batch_imgs = batch['transformed']
            batch_labels = batch['label']
            if torch.cuda.is_available():
                batch_labels = batch_labels.cuda()
            optimizer.zero_grad()
            preds,_ = modelo(batch_imgs)
            loss = cost_function(preds, batch_labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss = train_loss/len(train_loader)
        val_loss = validation_step(val_loader, modelo, cost_function)
        tqdm.write(f"Epoch: {epoch}, train_loss: {train_loss:.2f}, val_loss: {val_loss:.2f}")

        if(val_loss<best_epoch_loss):
            modelo.save_model("modelo_val_1.pt")
            best_epoch_loss=val_loss
        if(train_loss<best_epoch_loss_train):
            modelo.save_model("modelo_ent_1.pt")
            best_epoch_loss=train_loss
        plotter.on_epoch_end(epoch, train_loss, val_loss)
    modelo.save_model("modelo_fin_1.pt")
    plotter.on_train_end()


In [61]:
train()

Cargando datasets --> entrenamiento: 56599, validacion: 28299


Epoch: 0:   0%|          | 0/1769 [00:00<?, ?it/s]


RuntimeError: unable to mmap 4403200 bytes from file </home/hij555/Documents/GPT-5/ProyectoFinal/ML23_GPT5/ml23/proyecto_final/Backend/data/Dataset>: No such device (19)

<Figure size 640x480 with 0 Axes>