## 1. Load the libraries and datasets

In [None]:
%pip install ujson > /dev/null

from google.colab import drive
from google.colab import files
import gdown
import os
import pandas as pd
from tqdm.auto import tqdm
import numpy as np
import cv2
from matplotlib import pyplot as plt
import shutil
import random
from numpy import asarray

# PyTorch Dependencies
import torchvision
import torch.nn.functional as F
import torch
import gc
import torch.optim as optim

drive.mount('/content/gdrive')
print("Done")

Mounted at /content/gdrive
Done


In [None]:
# Download annotated subsets
!unzip -qn '/content/gdrive/MyDrive/validation_data.zip' > /dev/null

# Download original images
!unzip -qn '/content/gdrive/MyDrive/validation-subset-vein-detection.zip' > /dev/null

## 2. Load the pretrained model

In [None]:
# DefinCustomize writetemplate to write files from cells
from IPython.core.magic import register_line_cell_magic

@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, 'w') as f:
        f.write(cell.format(**globals()))

In [None]:
# Define the model
%%writetemplate /content/UnetResnet.py

import torch
import torchvision
import torch.nn.functional as F

def conv3x3_bn(ci, co):
    return torch.nn.Sequential(
        torch.nn.Conv2d(ci, co, 3, padding=1),
        torch.nn.BatchNorm2d(co),
        torch.nn.ReLU(inplace=True)
    )


def encoder_conv(ci, co):
    return torch.nn.Sequential(
        torch.nn.MaxPool2d(2),
        conv3x3_bn(ci, co),
        conv3x3_bn(co, co),
    )

class deconv(torch.nn.Module):
    def __init__(self, ci, co):
        super(deconv, self).__init__()
        self.upsample = torch.nn.ConvTranspose2d(ci, co, 2, stride=2)
        self.conv1 = conv3x3_bn(ci, co)
        self.conv2 = conv3x3_bn(co, co)

    def forward(self, x1, x2):
        x1 = self.upsample(x1)
        diffX = x2.size()[2] - x1.size()[2]
        diffY = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, (diffX, 0, diffY, 0))
        # concatenamos los tensores
        x = torch.cat([x2, x1], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x


class out_conv(torch.nn.Module):
    def __init__(self, ci, co, coo):
        super(out_conv, self).__init__()
        self.upsample = torch.nn.ConvTranspose2d(ci, co, 2, stride=2)
        self.conv = conv3x3_bn(ci, co)
        self.final = torch.nn.Conv2d(co, coo, 1)

    def forward(self, x1, x2):
        x1 = self.upsample(x1)
        diffX = x2.size()[2] - x1.size()[2]
        diffY = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, (diffX, 0, diffY, 0))
        x = self.conv(x1)
        x = self.final(x)
        return x


class UNetResnet(torch.nn.Module):
    def __init__(self, n_classes=3, in_ch=1):
        super().__init__()

        self.encoder = torchvision.models.resnet18(pretrained=True)
        if in_ch != 3:
            self.encoder.conv1 = torch.nn.Conv2d(in_ch, 64, kernel_size=7, stride=2, padding=3, bias=False)

        self.deconv1 = deconv(512, 256)
        self.deconv2 = deconv(256, 128)
        self.deconv3 = deconv(128, 64)
        self.out = out_conv(64, 64, n_classes)

    def forward(self, x):
        x_in = torch.tensor(x.clone())
        x = self.encoder.relu(self.encoder.bn1(self.encoder.conv1(x)))
        x1 = self.encoder.layer1(x)
        x2 = self.encoder.layer2(x1)
        x3 = self.encoder.layer3(x2)
        x = self.encoder.layer4(x3)
        x = self.deconv1(x, x3)
        x = self.deconv2(x, x2)
        x = self.deconv3(x, x1)
        x = self.out(x, x_in)
        return x

In [None]:
from UnetResnet import UNetResnet

checkpoint = torch.load('/content/gdrive/MyDrive/modelo_final.pth',map_location=torch.device('cpu'))
model = UNetResnet()
model.load_state_dict(checkpoint)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


  0%|          | 0.00/44.7M [00:00<?, ?B/s]

UNetResnet(
  (encoder): ResNet(
    (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_

## 3. Get predicted masks for the validation subset

In [None]:
# Folder with the masks predicted by the UnetResnet model
VALIDATION_MASKS_FOLDER = "/content/validation_masks"
# Folder with the base images that haven't been preprocessed
VALIDATION_IMAGES_FOLDER = "/content/validation"

# Check if the folder exists
if os.path.exists(VALIDATION_MASKS_FOLDER):
  !rm -R "$VALIDATION_MASKS_FOLDER"
  os.makedirs(VALIDATION_MASKS_FOLDER)
else:
  os.makedirs(VALIDATION_MASKS_FOLDER)

!mkdir "$VALIDATION_MASKS_FOLDER"

for image in tqdm(os.listdir(VALIDATION_IMAGES_FOLDER)):
  image_file_name = image
  image = cv2.imread(VALIDATION_IMAGES_FOLDER+"/"+image,0)

  # Apply CLAHE
  clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
  clahe_img = clahe.apply(image)

  # Normalise frame
  img = cv2.resize(clahe_img, (700,394), interpolation = cv2.INTER_AREA)
  center = img.shape[1]//2
  img = img[:,center-197:center+197].astype("float32")/255

  # Predict mask with background, arm, and vein segments
  with torch.no_grad():
      image = torch.from_numpy(img).unsqueeze(0)
      image = image.to(device)
      output = model(image.unsqueeze(0))[0]
      pred_mask = torch.argmax(output, axis=0).squeeze().cpu().numpy()

  # Polish the output mask's format
  pred_mask = pred_mask.astype(np.uint8)
  pred_mask[pred_mask==1] = 0
  pred_mask[pred_mask==2] = 1
  output = cv2.cvtColor(pred_mask, cv2.COLOR_GRAY2RGB)

  # Write the predicted mask
  cv2.imwrite(VALIDATION_MASKS_FOLDER+"/"+image_file_name, output)

mkdir: cannot create directory ‘/content/validation_masks’: File exists


  0%|          | 0/384 [00:00<?, ?it/s]

  x_in = torch.tensor(x.clone())


## 4. Calculate the accuracy of the model predictions

In [None]:
def validate(lower_threshold, upper_threshold, set_path):
  '''
    This function's main purpose is to find the total percentage of the intersection of annotated dots
    inside predicted segments, which are predicted by the UnetResnet model.
  '''
  intersections = []

  for image_path in tqdm(os.listdir(set_path), total = len(os.listdir(set_path))):
    # Open annotated image
    image = cv2.imread(set_path+image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Apply colour thresholding to the annotated image
    mask = cv2.inRange(image, lower_threshold, upper_threshold)

    # Resize and restructure the annotated image to compare it with a predicted mask
    img = cv2.resize(mask, (700,394), interpolation = cv2.INTER_AREA)
    center = img.shape[1]//2
    mask = img[:,center-197:center+197]
    ret, mask = cv2.threshold(mask,127,200,cv2.THRESH_BINARY)
    mask[mask==200] = 1

    # Read the corresponding predicted mask (coming from the model)
    pred_mask = cv2.imread("/content/validation_masks/"+image_path, 0)

    # Find the intersection between the annotated image (which is now a mask) and the predicted mask
    intersection = cv2.bitwise_and(mask, pred_mask)

    # Find the percentage of good predicted pixels (ideal venupuncture dots) over the total
    # of annotated venopuncture pixels in the image (annotated by the professionals)
    sum = np.sum(intersection) / np.sum(mask)

    # Append the percentages to the intersections list
    intersections.append(sum)

  # Return the mean percentage for the given subset
  my_list = np.array(intersections)
  return np.mean(my_list[np.isfinite(my_list)])

result_herrera = validate(lower_threshold=np.array([200,0,0]), upper_threshold=np.array([255,180,180]), set_path="/content/valid_Aleyda_Herrera/")
result_galindo = validate(lower_threshold=np.array([200,0,0]), upper_threshold=np.array([255,180,180]), set_path="/content/valid_Paula_Galindo/")
result_garron = validate(lower_threshold=np.array([200,0,0]), upper_threshold=np.array([255,180,180]), set_path="/content/valid_Danithsa_Garron/")

print("Validation Aleyda Herrera: ", result_herrera)
print("Validation Danithsa Garron: ", result_garron)
print("Validation Paula Galindo: ", result_galindo)


  0%|          | 0/384 [00:00<?, ?it/s]



  0%|          | 0/384 [00:00<?, ?it/s]

  0%|          | 0/384 [00:00<?, ?it/s]

Validación Aleyda Herrera:  0.8154981549815498
Validación Danithsa Garron:  0.784375
Validación Paula Galindo:  0.886986301369863
