In [None]:
#Librerias basicas
import torch
import matplotlib.pyplot as plt
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import IterableDataset
#from PIL import Image, ImageDraw
#import pandas as pd
#import os
import cv2
#import random
#import math
import torch.nn as nn
#import time
from functools import partial
from dataclasses import dataclass
from collections import OrderedDict

In [None]:
#Carga de datos X,Y
dataset_images = np.load("X_data_gray.npy",allow_pickle=True)
dataset_points = np.load("Y_data_Pos_EF.npy",allow_pickle=True)
dataset_mask = np.load("Y_data_mask.npy",allow_pickle=True)

In [None]:
##-------------------------UNET----------------------------------##
def conv3x3_bn(ci, co):
    return torch.nn.Sequential(
        torch.nn.Conv2d(ci, co, 3, padding=1),
        torch.nn.BatchNorm2d(co),
        torch.nn.ReLU(inplace=True)
    )

def encoder_conv(ci, co):
  return torch.nn.Sequential(
        torch.nn.MaxPool2d(2),
        conv3x3_bn(ci, co),
        conv3x3_bn(co, co),
    )

class deconv(torch.nn.Module):
    def __init__(self, ci, co):
        super(deconv, self).__init__()
        self.upsample = torch.nn.ConvTranspose2d(ci, co, 2, stride=2)
        self.conv1 = conv3x3_bn(ci, co)
        self.conv2 = conv3x3_bn(co, co)
    
    # recibe la salida de la capa anetrior y la salida de la etapa
    # correspondiente del encoder
    def forward(self, x1, x2):
        x1 = self.upsample(x1)
        diffX = x2.size()[2] - x1.size()[2]
        diffY = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, (diffX, 0, diffY, 0))
        # concatenamos los tensores
        x = torch.cat([x2, x1], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x

class UNet(torch.nn.Module):
    def __init__(self, n_classes=1, in_ch=1):
        super().__init__()

        # lista de capas en encoder-decoder con número de filtros
        c = [16, 32, 64, 128]

        # primera capa conv que recibe la imagen
        self.conv1 = torch.nn.Sequential(
          conv3x3_bn(in_ch, c[0]),
          conv3x3_bn(c[0], c[0]),
        )
        # capas del encoder
        self.conv2 = encoder_conv(c[0], c[1])
        self.conv3 = encoder_conv(c[1], c[2])
        self.conv4 = encoder_conv(c[2], c[3])

        # capas del decoder
        self.deconv1 = deconv(c[3],c[2])
        self.deconv2 = deconv(c[2],c[1])
        self.deconv3 = deconv(c[1],c[0])

        # útlima capa conv que nos da la máscara
        self.out = torch.nn.Conv2d(c[0], n_classes, 1)

    def forward(self, x):
        # encoder
        x1 = self.conv1(x)
        x2 = self.conv2(x1)
        x3 = self.conv3(x2)
        x = self.conv4(x3)
        # decoder
        x = self.deconv1(x, x3)
        x = self.deconv2(x, x2)
        x = self.deconv3(x, x1)
        x = self.out(x)
        return x

In [None]:
# metrica
def iou(outputs, labels):
    # aplicar sigmoid y convertir a binario
    outputs, labels = torch.sigmoid(outputs) > 0.5, labels > 0.5
    SMOOTH = 1e-6
    # BATCH x num_classes x H x W
    
    ious = []
    
    intersection = (outputs & labels).float().sum((1, 2))  
    union = (outputs | labels).float().sum((1, 2))         
    iou = (intersection + SMOOTH) / (union + SMOOTH)  
    ious.append(iou.mean().item())
    return np.mean(ious) 

In [None]:

# crear mi dataset
class Dataset(torch.utils.data.Dataset):
    # This loads the data and converts it, make data rdy
    def __init__(self,X, y):
        # load data
        N,W,H = X.shape
        self.img=torch.tensor(X.reshape(N,1,W,H).astype(np.float32))
        self.mask=torch.tensor(y.reshape(N,1,W,H).astype(np.float32))
    
    # This returns the total amount of samples in your Dataset
    def __len__(self):
        return len(self.img)
    
    # This returns given an index the i-th sample and label
    def __getitem__(self, idx):
        return self.img[idx],self.mask[idx]
    

In [None]:
N,W,H = dataset_images.shape
N_training = int(0.7*N)
dataset = {
    'train': Dataset(dataset_images[0:N_training], dataset_mask[0:N_training]),
    'test': Dataset(dataset_images[N_training:N], dataset_mask[N_training:N])
}

len(dataset['train']), len(dataset['test'])

In [None]:
dataloader = {
    'train': DataLoader(dataset['train'], batch_size=50, shuffle=True, pin_memory=True),
    'test': DataLoader(dataset['test'], batch_size=100, pin_memory=True)
}


In [None]:
#funcion de entrenamiento
from tqdm import tqdm
device = "cuda" if torch.cuda.is_available() else "cpu"
# Si sale algun error con el CUDA relacionado con el espacio, comentar la linea de arriba y descomentar la de abajo
#device = "cpu"
def fit(model, dataloader, epochs=100, lr=3e-4):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.BCEWithLogitsLoss()
    #criterion = torch.nn.CrossEntropyLoss()
    model.to(device)
    hist = {'loss': [], 'iou': [], 'test_loss': [], 'test_iou': []}
    for epoch in range(1, epochs+1):
      bar = tqdm(dataloader['train'])
      train_loss, train_iou = [], []
      model.train()
      for imgs, masks in bar:
        imgs, masks = imgs.to(device), masks.to(device)
        optimizer.zero_grad()
        y_hat = model(imgs)
        loss = criterion(y_hat, masks)
        loss.backward()
        optimizer.step()
        ious = iou(y_hat, masks)
        train_loss.append(loss.item())
        train_iou.append(ious)
        bar.set_description(f"loss {np.mean(train_loss):.5f} iou {np.mean(train_iou):.5f}")
      hist['loss'].append(np.mean(train_loss))
      hist['iou'].append(np.mean(train_iou))
      bar = tqdm(dataloader['test'])
      test_loss, test_iou = [], []
      model.eval()
      with torch.no_grad():
        for imgs, masks in bar:
          imgs, masks = imgs.to(device), masks.to(device)
          y_hat = model(imgs)
          loss = criterion(y_hat, masks)
          ious = iou(y_hat, masks)
          test_loss.append(loss.item())
          test_iou.append(ious)
          bar.set_description(f"test_loss {np.mean(test_loss):.5f} test_iou {np.mean(test_iou):.5f}")
      hist['test_loss'].append(np.mean(test_loss))
      hist['test_iou'].append(np.mean(test_iou))
      print(f"\nEpoch {epoch}/{epochs} loss {np.mean(train_loss):.5f} iou {np.mean(train_iou):.5f} test_loss {np.mean(test_loss):.5f} test_iou {np.mean(test_iou):.5f}")
    return hist

In [None]:
model = UNet()
hist = fit(model, dataloader, epochs=15)

In [None]:
#guardar modelo
torch.save(model, 'mi_modelo_name.pt')