# Deep Learning: Laparoscopic segmentation.
[Dataset](https://www.kaggle.com/datasets/newslab/cholecseg8k?resource=download)

In [1]:
import os

os.environ["CUBLAS_WORKSPACE_CONFIG"]=":4096:8"

In [2]:
import torch
from glob import glob
import numpy as np
import time
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import cv2
import random
from pytorch_Uformer import modelPool
from pytorch_Uformer import losses
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import math

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import torch
torch.cuda.empty_cache()

# Continue with your code

# Descomentar la siguiente linea si tu ordenador no tiene una GPU NVIDIA, y deseas que el código funcione en una CPU en lugar de una GPU
#torch.cuda.is_available(), torch.cuda.get_device_name(0)   

In [4]:
SEED = 17
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.use_deterministic_algorithms(True)

In [5]:
# determine the device to be used for training and evaluation
DEVICE = "cpu"
# determine if we will be pinning memory during data loading
PIN_MEMORY = False

NUM_CLASSES = 13
# initialize learning rate, number of epochs to train for, and the
# batch size
INIT_LR = 3e-4
NUM_EPOCHS = 100
BATCH_SIZE = 32
# define the input image dimensions
INPUT_IMAGE_WIDTH = 64
INPUT_IMAGE_HEIGHT = 64

CLASS_WEIGHTS = [2.45780159e-01, 2.24610010e-01, 2.24005501e-01, 1.87119512e-02,
 1.46965911e-01, 2.54698498e-02, 2.35562466e-02, 4.28713053e-03,
 3.93593665e-04, 1.38500705e-02, 6.78984614e-02, 1.01436955e-04,
 4.36967906e-03]

## Data preparation.

In [6]:
IMAGE_DATASET_PATH = glob(f'../*/*/*/frame_*_endo.png')
MASK_DATASET_PATH  = glob(f'../*/*/*/frame_*_endo_watershed_mask.png')
N = len(IMAGE_DATASET_PATH)
assert len(IMAGE_DATASET_PATH) == len(MASK_DATASET_PATH)

print(f'Number of samples: {str(len(MASK_DATASET_PATH))}')

Number of samples: 8080


In [7]:
# https://gist.github.com/IdeaKing/11cf5e146d23c5bb219ba3508cca89ec
def resize_with_pad(image: np.array, 
                    new_shape, 
                    padding_color = (0, 0, 0)) -> np.array:
    """Maintains aspect ratio and resizes with padding.
    Params:
        image: Image to be resized.
        new_shape: Expected (width, height) of new image.
        padding_color: Tuple in BGR of padding color
    Returns:
        image: Resized image with padding
    """
    original_shape = (image.shape[1], image.shape[0])
    ratio = float(max(new_shape))/max(original_shape)
    new_size = tuple([int(x*ratio) for x in original_shape])
    image = cv2.resize(image, new_size, interpolation=cv2.INTER_NEAREST)
    delta_w = new_shape[0] - new_size[0]
    delta_h = new_shape[1] - new_size[1]
    top, bottom = delta_h//2, delta_h-(delta_h//2)
    left, right = delta_w//2, delta_w-(delta_w//2)
    image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=padding_color)
    return image

In [8]:
def to_categorical(y, num_classes=None, dtype="float32"):
	y = np.array(y, dtype="int")
	input_shape = y.shape

	# Shrink the last dimension if the shape is (..., 1).
	if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:
		input_shape = tuple(input_shape[:-1])

	y = y.reshape(-1)
	if not num_classes:
		num_classes = np.max(y) + 1
	n = y.shape[0]
	categorical = np.zeros((n, num_classes), dtype=dtype)
	categorical[np.arange(n), y] = 1
	output_shape = input_shape + (num_classes,)
	categorical = np.reshape(categorical, output_shape)
	return categorical

labels = [[11],[21],[13],[12],[31],[23],[24],[25],[32],[22],[33],[5]]
def encode_mask(masks_arr, n_classes=NUM_CLASSES, H=INPUT_IMAGE_HEIGHT, W=INPUT_IMAGE_WIDTH, labels_arr=labels):
    y_encoded = np.zeros((H, W, 1), dtype=np.uint8)

    for i, label in enumerate(labels_arr):
        y_encoded[np.all(label == masks_arr, axis=-1)] = i + 1
    
    return to_categorical(y_encoded, num_classes=n_classes)

class SegmentationDataset(Dataset):
	def __init__(self, imagePaths, maskPaths):
		# store the image and mask filepaths, and augmentation
		# transforms
		self.imagePaths = imagePaths
		self.maskPaths = maskPaths

	def __len__(self):
		# return the number of total samples contained in the dataset
		return len(self.imagePaths)
	
	def __getitem__(self, idx):
		# grab the image path from the current index
		imagePath = self.imagePaths[idx]
		# load the image from disk, swap its channels from BGR to RGB,
		# and read the associated mask from disk in grayscale mode
		image = cv2.imread(imagePath)
		image = image[10:-10, 120:-120,:]
		image = resize_with_pad(image, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT))
		image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
		image = image / 255
		image = torch.from_numpy(np.float32(image))
		image = image.permute(2,0,1)

		mask = cv2.imread(self.maskPaths[idx])
		mask = mask[10:-10, 120:-120,:]
		mask = resize_with_pad(mask, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT), (50, 50, 50))
		mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
		mask = np.expand_dims(mask, axis=-1)
		mask = encode_mask(mask)
		mask = torch.from_numpy(np.float32(mask))
		mask = mask.permute(2,0,1)

		assert image.shape == (3, INPUT_IMAGE_HEIGHT, INPUT_IMAGE_WIDTH), f"Bad image shape {image.shape}"
		assert mask.shape == (13, INPUT_IMAGE_HEIGHT, INPUT_IMAGE_WIDTH), f"Bad mask shape {mask.shape}"

		# return a tuple of the image and its mask
		return image, mask

In [9]:
uformer = modelPool.Uformer(img_size=INPUT_IMAGE_HEIGHT,in_chans=13, embed_dim=16, win_size=8, token_projection='linear', token_mlp='leff',modulator=True)
trainImages, testImages, trainMasks, testMasks = train_test_split(IMAGE_DATASET_PATH, MASK_DATASET_PATH, test_size=0.2, random_state=43)

In [10]:
# define transformations
from torch.utils.data import DataLoader

# create the train and test datasets
trainDS = SegmentationDataset(imagePaths=trainImages, maskPaths=trainMasks)
testDS = SegmentationDataset(imagePaths=testImages, maskPaths=testMasks)
print(f"[INFO] found {len(trainDS)} examples in the training set...")
print(f"[INFO] found {len(testDS)} examples in the test set...")
# create the training and test data loaders
num_workers = 4
def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(0)

trainLoader = DataLoader(
	trainDS, 
	shuffle=True,
	batch_size=BATCH_SIZE, 
	pin_memory=PIN_MEMORY,
    worker_init_fn=seed_worker,
    generator=g
)
testLoader = DataLoader(
	testDS, shuffle=False,
	batch_size=BATCH_SIZE, 
	pin_memory=PIN_MEMORY,
    worker_init_fn=seed_worker,
    generator=g
)

[INFO] found 6464 examples in the training set...
[INFO] found 1616 examples in the test set...


In [11]:
# Set the device to CPU or GPU (whichever is configured)
DEVICE = torch.device("cpu")

uformer = uformer.to(DEVICE)

softmax = torch.nn.Softmax(1)

# Initialize loss function and optimizer
lossFunc = losses.DiceLoss(class_weights=CLASS_WEIGHTS)
iouMetric = losses.MeanIoU()
gdlMetric = losses.DiceCoeficient()

# Move the loss function and metrics to the GPU if DEVICE is a GPU
#if DEVICE.type == 'cuda':
 #   lossFunc = lossFunc.cuda()
 #  iouMetric = iouMetric.cuda()
 #    gdlMetric = gdlMetric.cuda()

opt = torch.optim.Adam(uformer.parameters(), lr=INIT_LR, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-5)

# Calculate steps per epoch for training and test set
trainSteps = math.ceil(len(trainDS) / BATCH_SIZE)
testSteps = math.ceil(len(testDS) / BATCH_SIZE)

# Initialize a dictionary to store training history
H = {"train_loss": [], "test_loss": [], "train_IoU": [], "test_IoU": [], "train_DC": [], "test_DC": []}

# Rest of your code (the training loop)


In [12]:
# Set the device to CPU
DEVICE = torch.device("cpu")

bestIoU = 0

# Define the number of epochs and other parameters as needed
NUM_EPOCHS = 100
trainSteps = len(trainLoader)
testSteps = len(testLoader)

# Initialize a dictionary to store training history
H = {"train_loss": [], "test_loss": [], "train_IoU": [], "test_IoU": [], "train_DC": [], "test_DC": []}

# Loop over epochs
print("[INFO] training the network...")
startTime = time.time()
for e in range(NUM_EPOCHS):
    # No need to empty cache since you are using CPU

    # Set the model in training mode
    uformer.train()

    # Initialize total training and validation loss and metrics
    totalTrainLoss = 0
    totalTestLoss = 0
    totalTrainIoU = 0
    totalTestIoU = 0
    totalTrainDC = 0
    totalTestDC = 0

    # Loop over the training set
    for i, (x, y) in enumerate(tqdm(trainLoader)):
        # Send the input to the device
        x, y = x.to(DEVICE), y.to(DEVICE)

        # Perform a forward pass and calculate the training loss
        pred = uformer(x)
        assert pred.shape == y.shape, f"{pred.shape} != {y.shape}"  # B, C, H, W
        pred = softmax(pred)
        loss = lossFunc(pred, y)

        # Zero out any previously accumulated gradients, perform backpropagation, and update model parameters
        opt.zero_grad()
        loss.backward()
        opt.step()

        # Add the loss and metrics to the totals
        totalTrainLoss += loss
        totalTrainIoU += iouMetric(pred, y)
        totalTrainDC += gdlMetric(pred, y)

    # Switch off autograd
    with torch.no_grad():
        # Set the model in evaluation mode
        uformer.eval()

        # Loop over the validation set
        for (x, y) in testLoader:
            # Send the input to the device
            (x, y) = (x.to(DEVICE), y.to(DEVICE))

            # Make predictions and calculate the validation loss
            pred = uformer(x)
            assert pred.shape == y.shape, f"{pred.shape} != {y.shape}"  # B, C, H, W
            pred = softmax(pred)
            totalTestLoss += lossFunc(pred, y)
            totalTestIoU += iouMetric(pred, y)
            totalTestDC += gdlMetric(pred, y)

    # Calculate the average training and validation loss and metrics
    avgTrainLoss = totalTrainLoss / trainSteps
    avgTestLoss = totalTestLoss / testSteps
    avgTrainIoU = totalTrainIoU / trainSteps
    avgTestIoU = totalTestIoU / testSteps
    avgTrainDC = totalTrainDC / trainSteps
    avgTestDC = totalTestDC / testSteps

    # Update the training history
    H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
    H["test_loss"].append(avgTestLoss.cpu().detach().numpy())
    H["train_IoU"].append(avgTrainIoU.cpu().detach().numpy())
    H['test_IoU'].append(avgTestIoU.cpu().detach().numpy())
    H['train_DC'].append(avgTrainDC.cpu().detach().numpy())
    H['test_DC'].append(avgTestDC.cpu().detach().numpy())

    # Print the model training and validation information
    print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
    print(f"Train loss: {avgTrainLoss:.6f}, Test loss: {avgTestLoss:.4f}, "
          f"Train IoU: {avgTrainIoU:.4f}, Test IoU: {avgTestIoU:.4f}, Train Dice: {avgTrainDC:.4f}, Test Dice: {avgTestDC:.4f}")

    # Check if the current epoch's IoU is the best and save the model
    if avgTestIoU > bestIoU:
        bestIoU = avgTestIoU
        torch.save(uformer.state_dict(), '../models/uformerpool_diceLoss.pth')
        print('Best IoU improved. Model saved.')

# Display the total time needed to perform the training
endTime = time.time()
print("[INFO] Total time taken to train the model: {:.2f}s".format(endTime - startTime))
print(f'Best IoU is: {bestIoU:.5f}')


[INFO] training the network...


100%|██████████| 202/202 [45:30<00:00, 13.52s/it]


[INFO] EPOCH: 1/10
Train loss: 0.952134, Test loss: 0.9437, Train IoU: 0.5111, Test IoU: 0.6322, Train Dice: 0.6640, Test Dice: 0.7744
Best IoU improved. Model saved.


100%|██████████| 202/202 [50:20<00:00, 14.95s/it]


[INFO] EPOCH: 2/10
Train loss: 0.941758, Test loss: 0.9429, Train IoU: 0.6626, Test IoU: 0.6464, Train Dice: 0.7967, Test Dice: 0.7849
Best IoU improved. Model saved.


100%|██████████| 202/202 [55:25<00:00, 16.46s/it]


[INFO] EPOCH: 3/10
Train loss: 0.943370, Test loss: 0.9458, Train IoU: 0.6344, Test IoU: 0.6001, Train Dice: 0.7741, Test Dice: 0.7499


100%|██████████| 202/202 [1:02:58<00:00, 18.70s/it]


[INFO] EPOCH: 4/10
Train loss: 0.947572, Test loss: 0.9460, Train IoU: 0.5735, Test IoU: 0.5953, Train Dice: 0.7281, Test Dice: 0.7460


100%|██████████| 202/202 [1:02:23<00:00, 18.53s/it]


[INFO] EPOCH: 5/10
Train loss: 0.945482, Test loss: 0.9461, Train IoU: 0.6031, Test IoU: 0.5888, Train Dice: 0.7522, Test Dice: 0.7410


100%|██████████| 202/202 [1:03:55<00:00, 18.99s/it]


[INFO] EPOCH: 6/10
Train loss: 0.945870, Test loss: 0.9472, Train IoU: 0.5939, Test IoU: 0.5834, Train Dice: 0.7449, Test Dice: 0.7364


100%|██████████| 202/202 [1:04:53<00:00, 19.27s/it]


[INFO] EPOCH: 7/10
Train loss: 0.946413, Test loss: 0.9499, Train IoU: 0.5883, Test IoU: 0.5188, Train Dice: 0.7403, Test Dice: 0.6830


100%|██████████| 202/202 [1:03:45<00:00, 18.94s/it]


[INFO] EPOCH: 8/10
Train loss: 0.948139, Test loss: 0.9488, Train IoU: 0.5556, Test IoU: 0.5422, Train Dice: 0.7141, Test Dice: 0.7030


100%|██████████| 202/202 [1:01:13<00:00, 18.19s/it]


[INFO] EPOCH: 9/10
Train loss: 0.951651, Test loss: 0.9540, Train IoU: 0.5037, Test IoU: 0.4627, Train Dice: 0.6692, Test Dice: 0.6325


100%|██████████| 202/202 [52:40<00:00, 15.65s/it]


[INFO] EPOCH: 10/10
Train loss: 0.958688, Test loss: 0.9707, Train IoU: 0.4153, Test IoU: 0.2680, Train Dice: 0.5845, Test Dice: 0.4223
[INFO] Total time taken to train the model: 37287.77s
Best IoU is: 0.64644


In [13]:
# serialize the model to disk
# torch.save(uformer.state_dict(), '../models/uformerpool_tversky.pth')
# uformer.load_state_dict(torch.load('../models/uformer.pth'))

In [None]:
plt.figure(figsize=(12,4))
plt.subplot(121)
plt.plot(H['train_loss'])
plt.plot(H['test_loss'])
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')

plt.subplot(122)
plt.plot(H['train_IoU'])
plt.plot(H['test_IoU'])
plt.plot(H['train_DC'])
plt.plot(H['test_DC'])
plt.title('Model metrics')
plt.ylabel('score')
plt.xlabel('epoch')

plt.legend(['train IoU', 'val IoU', 'train dice', 'val_dice'], loc='lower right')

plt.show()

In [15]:
def make_prediction(model, image):
	image = torch.Tensor(image).permute((2, 0, 1)).to(DEVICE)
	image = torch.unsqueeze(image, 0)
	# set model to evaluation mode
	model.eval()
	# turn off gradient tracking
	with torch.no_grad():
		predMask = model(image)
		predMask = softmax(predMask)
		predMask = torch.squeeze(predMask, 0)
		predMask = predMask.permute((1, 2, 0))
		predMask = predMask.cpu().detach().numpy()
		predMask = np.argmax(predMask, axis=-1)
		predMask = np.expand_dims(predMask, axis=-1)
		# filter out the weak predictions and convert them to integers
		return predMask.astype(np.uint8)


In [16]:
def getPredImg(image_path):     
    image = cv2.imread(image_path)
    image = image[10:-10, 120:-120,:]
    image = resize_with_pad(image, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = image / 255
    return image

def getPredMask(mask_path):
    mask = cv2.imread(mask_path)
    mask = mask[10:-10, 120:-120,:]
    mask = resize_with_pad(mask, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT), (50, 50, 50))
    mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
    mask = np.expand_dims(mask, axis=-1)
    mask = encode_mask(mask)
    return mask

In [None]:
plt.figure(figsize=(10,10))
for i in range(3):    

    idx = random.randint(0, len(testImages) -1)
    img = getPredImg(testImages[idx])
    mask = getPredMask(testMasks[idx])
    
    plt.subplot(331 + 3*i)
    plt.imshow(img)
    plt.title('Image')
    plt.axis("off")

    mask = np.argmax(mask, axis=-1)
    mask = np.expand_dims(mask, axis=-1)
    plt.subplot(332 + 3*i)
    plt.imshow(mask, cmap='afmhot', norm=plt.Normalize(vmin=0, vmax=12))
    plt.title('Mask')
    plt.axis("off")

    pred = make_prediction(uformer, img)
    plt.subplot(333 + 3*i)
    plt.imshow(pred, cmap='afmhot', norm=plt.Normalize(vmin=0, vmax=12))
    plt.title('Prediction')
    plt.axis("off")

plt.show()

--
**TFG - Adrián Contreras Castillo**
--