Tenga en cuenta que el archivo `unet_model.py` utilizado en este tutorial es solo para demostración.
El autor original de `unet_model.py` es Supervisely.
Consultar README de pv-vision y `unet_model.py` para conocer los términos de uso.
Puede cambiar el modelo aquí con otros pesos para uso personal.


In [None]:
# Importar librerías

import torch 
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.nn import DataParallel

from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torchvision.io import read_image, ImageReadMode
from torchvision.datasets.vision import VisionDataset
from torchvision.models.segmentation import deeplabv3_resnet50, DeepLabV3_ResNet50_Weights
from torchvision.models.segmentation.deeplabv3 import DeepLabHead

from torchvision.utils import draw_segmentation_masks
import torchvision.transforms.functional as F

import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt

import requests
import copy

from unet_model import construct_unet
from pathlib import Path
from PIL import Image
from imutils.paths import list_images
import os

# Importar Model Handler
from pv_vision.nn import ModelHandler

In [None]:
# will put this method into util in the future
class SolarDataset(VisionDataset):
    """ Clase para cargar el dataset de segmentación de paneles solares. 
    """
    def __init__(self, 
                 root, 
                 image_folder, 
                 mask_folder,
                 transforms,
                 mode = "train",
                 random_seed=42):
        super().__init__(root, transforms)
        self.image_path = Path(self.root) / image_folder
        self.mask_path = Path(self.root) / mask_folder

        if not os.path.exists(self.image_path):
            raise OSError(f"{self.image_path} not found.")

        if not os.path.exists(self.mask_path):
            raise OSError(f"{self.mask_path} not found.")

        self.image_list = sorted(list(list_images(self.image_path)))
        self.mask_list = sorted(list(list_images(self.mask_path)))

        self.image_list = np.array(self.image_list)
        self.mask_list = np.array(self.mask_list)

        np.random.seed(random_seed)
        index = np.arange(len(self.image_list))
        np.random.shuffle(index)
        self.image_list = self.image_list[index]
        self.mask_list = self.mask_list[index]

    def __len__(self):
        return len(self.image_list)

    def __getname__(self, index):
        image_name = os.path.splitext(os.path.split(self.image_list[index])[-1])[0]
        mask_name = os.path.splitext(os.path.split(self.mask_list[index])[-1])[0]

        if image_name == mask_name:
            return image_name
        else:
            return False
    
    def __getraw__(self, index):
        if not self.__getname__(index):
            raise ValueError("{}: Image doesn't match with mask".format(os.path.split(self.image_list[index])[-1]))
        image = Image.open(self.image_list[index])
        mask = Image.open(self.mask_list[index]).convert('L')
        mask = np.array(mask)
        mask = Image.fromarray(mask)

        return image, mask

    def __getitem__(self, index):
        image, mask = self.__getraw__(index)
        image, mask = self.transforms(image, mask)

        return image, mask

In [None]:
# Clase Compose, realiza una serie de transformaciones a la imagen y la máscara
class Compose:
    def __init__(self, transforms):
        """
        transforms: a list of transform
        """
        self.transforms = transforms
    
    def __call__(self, image, target):
        """
        image: input image
        target: input mask
        """
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

# Clase FixResize, redimensiona la imagen y la máscara a un tamaño específico
class FixResize:
    # UNet requires input size to be multiple of 16
    def __init__(self, size):
        self.size = size

    def __call__(self, image, target):
        image = F.resize(image, (self.size, self.size), interpolation=transforms.InterpolationMode.BILINEAR)
        target = F.resize(target, (self.size, self.size), interpolation=transforms.InterpolationMode.NEAREST)
        return image, target

# Clase RandomHorizontalFlip, realiza un flip horizontal a la imagen y la máscara
class ToTensor:
    """Transform the image to tensor. Scale the image to [0,1] float32.
    Transform the mask to tensor.
    """
    def __call__(self, image, target):
        image = transforms.ToTensor()(image)
        target = torch.as_tensor(np.array(target), dtype=torch.int64)
        return image, target

# Clase RandomHorizontalFlip, realiza un flip horizontal a la imagen y la máscara
class PILToTensor:
    """Transform the image to tensor. Keep raw type."""
    def __call__(self, image, target):
        image = F.pil_to_tensor(image)
        target = torch.as_tensor(np.array(target), dtype=torch.int64)
        return image, target

# Clase RandomHorizontalFlip, realiza un flip horizontal a la imagen y la máscara
class Normalize:
    def __init__(self, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
        self.mean = mean
        self.std = std
    
    def __call__(self, image, target):
        image = F.normalize(image, mean=self.mean, std=self.std)
        return image, target

In [None]:
# Carpeta de imágenes
#images = [cv.imread(file) for file in list_images('/home/franklin/PVDefectDetect/TestsPV-vision/examples/crack_segmentation/img_for_prediction')]
#images = [cv.imread(file) for file in list_images('/home/franklin/PVDefectDetect/Test_crack')]
image_path = 'D:/Documentos/Universidad de Cuenca/Trabajo de Titulación/Datasets_EL/CeldasIndividuales/Mono2_V40_I5_t28'
images = [cv.imread(file) for file in list_images(image_path)]
transformers = Compose([FixResize(256), ToTensor(), Normalize()])

origin_path = 'D:/Documentos/Universidad de Cuenca/Trabajo de Titulación/CellAnotation_no_humanMasks/dataset_cells'

valset = SolarDataset(root=origin_path, image_folder='images', 
                      mask_folder='annotations', transforms = transformers)

# Verificar que las imágenes se cargaron correctamente
print(f'Número de imágenes: {len(images)}')


El siguiente código define una clase `myDataset` que hereda de la clase `Dataset` de PyTorch. Esta clase se utiliza para cargar y transformar las imágenes que se pasarán al modelo. La transformación se aplica a cada imagen cuando se accede a ella, no todas a la vez al principio. Esto es más eficiente en términos de memoria, especialmente cuando se trabaja con conjuntos de datos grandes.

In [None]:
# Definir una clase personalizada que hereda de Dataset
class myDataset(Dataset):
    # El método de inicialización se llama cuando se crea una instancia de la clase
    def __init__(self, images, transform):
        # Guardar las imágenes y la transformación como atributos de la instancia
        self.images = images
        self.transform = transform

    # El método __len__ devuelve el número de elementos en el conjunto de datos
    def __len__(self):
        return len(self.images)

    # El método __getitem__ se utiliza para obtener un elemento del conjunto de datos
    def __getitem__(self, idx):
        # Redimensionar la imagen al tamaño deseado
        image = cv.resize(self.images[idx], (256, 256))
        # Aplicar la transformación a la imagen
        image = self.transform(image)
        
        # Devolver la imagen transformada
        return image

El siguiente código define una transformación que se aplica a las imágenes antes de pasarlas al modelo. La transformación consta de dos pasos: convertir la imagen a un tensor de PyTorch y normalizar los valores de los píxeles. Luego, se crea una instancia de la clase `myDataset` que se utiliza para cargar y transformar las imágenes.

In [None]:
# Definir la transformación de las imágenes que se pasará al manejador del modelo
transform = transforms.Compose([
    # Convertir la imagen a un tensor de PyTorch y escalar los valores de los píxeles entre 0 y 1
    transforms.ToTensor(),
    # Normalizar cada canal de color de la imagen. Los valores de la media y la desviación estándar se especifican para cada canal (RGB). 
    # Estos valores son los valores de media y desviación estándar del conjunto de datos ImageNet.
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

# Crear una instancia de la clase myDataset. Esta clase es un tipo personalizado de Dataset que se utiliza para cargar y transformar las imágenes.
# La lista de imágenes y la transformación compuesta se pasan como argumentos al inicializar el conjunto de datos.
imgset = myDataset(images, transform)

In [None]:
# Carpeta de pesos del modelo

#weight_path = '/home/franklin/supervisely/neural network weights/crack_segmentation/unet_oversample_low_final_model_for_paper/model.pt'
weight_path = 'D:/Documentos/PV_Vision/Neural_Network_W/crack_segmentation/unet_oversample_low_final_model_for_paper/model.pt'

In [None]:
# Definir el dispositivo en el que se ejecutará el modelo. Si hay una GPU disponible, se utilizará. De lo contrario, se utilizará la CPU.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Dispositivo: {device}')

# Crear una instancia del modelo U-Net con 5 canales de salida. 
# El número de canales de salida generalmente corresponde al número de clases que el modelo está diseñado para predecir.
unet = construct_unet(5)

# Envolver el modelo en un objeto DataParallel. 
# Esto permite que el modelo se ejecute en paralelo en múltiples GPUs, si están disponibles.
unet = DataParallel(unet)

Dispositivo: cuda:0




Este código define el dispositivo en el que se ejecutará el modelo (GPU si está disponible, de lo contrario CPU), crea una instancia del modelo U-Net, y luego envuelve el modelo en un objeto `DataParallel` para permitir la ejecución en paralelo en múltiples GPUs.

In [None]:
# Inicializar el manejador del modelo (ModelHandler). 
# Este objeto se encargará de la gestión del modelo, incluyendo la carga de los datos, la ejecución del modelo y el almacenamiento de los resultados.
modelhandler = ModelHandler(
    # El modelo que se va a utilizar. En este caso, es la instancia de U-Net que se ha creado anteriormente.
    model=unet,
    # El conjunto de datos que se utilizará para las pruebas. En este caso, es el conjunto de imágenes que se ha cargado y transformado anteriormente.
    test_dataset=imgset,
    # Indica que sólo se realizarán predicciones, no se entrenará el modelo.
    predict_only=True,
    # El tamaño del lote que se utilizará durante la validación. En este caso, se procesarán 2 imágenes a la vez.
    batch_size_val=2,
    # El dispositivo en el que se ejecutará el modelo. En este caso, es el dispositivo que hemos definido anteriormente (GPU si está disponible, de lo contrario CPU).
    device=device,
    # El directorio donde se guardarán los resultados. En este caso, los resultados se guardarán en un directorio llamado 'output'.
    save_dir='D:/Documentos/Universidad de Cuenca/Trabajo de Titulación/Predicciones/Modulos/Celdas/output',
    # El nombre que se utilizará para guardar los resultados. En este caso, los resultados se guardarán con el nombre 'unet_prediction'.
    save_name='unet_cell_prediction'
)

# Cargar los pesos del modelo desde el archivo especificado por 'weight_path'.
# Esto permite utilizar un modelo que ha sido entrenado previamente, en lugar de tener que entrenar el modelo desde cero.
modelhandler.load_model(weight_path)



El código anterior inicializa un objeto `ModelHandler` que se encargará de la gestión del modelo, incluyendo la carga de los datos, la ejecución del modelo y el almacenamiento de los resultados. Luego, carga los pesos del modelo desde un archivo especificado.

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = DataParallel(DeepLab_pretrained(5))
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.01)
lr_scheduler = StepLR(optimizer, step_size=5, gamma=0.2)

Dispositivo: cuda:0


In [None]:
# initialize modelhandler
# The output is stored in the output folder
modelhandler = ModelHandler(
    model=model,
    model_output='out',
    train_dataset=trainset,
    val_dataset=valset,
    test_dataset=testset,
    batch_size_train=32,
    batch_size_val=32,
    lr_scheduler=lr_scheduler,
    num_epochs=10,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    save_dir='checkpoints',
    save_name='deeplab.pt'
)