#Medical image segmentation
Medical image segmentation, identifying the pixels of organs or lesions from background medical images such as CT or MRI images, is one of the most challenging tasks in medical image analysis that is to deliver critical information about the shapes and volumes of these organs.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
os.listdir('/content/drive/My Drive/new_data/')

['train', 'test']

# Study Case :
our last model for multi-labels learning we got a good
performance from the model CNN , to gives us the probabilistic of each label
in singal image . but here was an issue with classfier has not labeling when it
comes with tow classes ( Normal , Gloumia ) because the features with these
classes are similary to each other , so here we purpose a solution for improve
the mechanism of extracting features by using segmentation images technics and the goal beind Unet is creating a new database segemented Classes to classify , and comparing the results

Setup all functionalities being used for Data Preparation

Images in the training dataset had differing sizes, therefore images had to be resized before being used as input to the model.

Square images were resized to the shape 256×256 pixels. Rectangular images were resized to 256 pixels on their shortest side, then the middle 256×256 square was cropped from the image. Note: the network expects input images to have the shape 512x512, achieved via training augmentation

In [None]:
import os
import time
import random
import numpy as np
import cv2
import torch

""" Seeding the randomness. """
def seeding(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

""" Create a directory. """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

""" Calculate the time taken """
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
import os
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset

class DriveDataset(Dataset):
    def __init__(self, images_path, masks_path):

        self.images_path = images_path
        self.masks_path = masks_path
        self.n_samples = len(images_path)

    def __getitem__(self, index):
        """ Reading image """
        image = cv2.imread(self.images_path[index], cv2.IMREAD_COLOR)
        image = image/255.0 ## (512, 512, 3)
        image = np.transpose(image, (2, 0, 1))  ## (3, 512, 512)
        image = image.astype(np.float32)
        image = torch.from_numpy(image)

        """ Reading mask """
        mask = cv2.imread(self.masks_path[index], cv2.IMREAD_GRAYSCALE)
        mask = mask/255.0   ## (512, 512)
        mask = np.expand_dims(mask, axis=0) ## (1, 512, 512)
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)

        return image, mask

    def __len__(self):
        return self.n_samples

# Loss Function DiceLoss


The Dice coefficient tells you how well your model is performing when it comes to detecting boundaries with regards to your ground truth data. The loss is computed with 1 - Dice coefficient where the the dice coefficient is between 0-1.

Over every epoch the loss will determine the acceleration of learning and the updates of weights to reduce the loss as much as possible. The dice coefficient also takes into account global and local composition of pixels, thereby providing better boundary detection than a weighted cross entropy. link https://arxiv.org/pdf/1608.04117.pdf


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

#U-Net 2D U-Net
One of the most well-known structures for medical image segmentation is U-Net, initially proposed by Ronneberger et al. using the concept of deconvolution introduced by. This model is built upon the elegant architecture of FCN. Besides the increased depth of network to 19 layers, U-Net benefits from a superior design of skip connections between different stages of the network. It employs some modifications to overcome the trade-off between localization and the use of context. This trade-off rises since the large-sized patches require more pooling layers and consequently will reduce the localization accuracy. On the other hand, small-sized patches can only observe small context of input. The proposed structure consists of two paths of analysis and synthesis. The analysis path follows the structure of CNN (see Fig. 4). The synthesis path, commonly known as expansion phase, consists of an upsampling layer followed by a deconvolution layer. The most important property of U-Net is the shortcut connections between the layers of equal resolution in analysis path to expansion path. These connections provides essential high-resolution features to the deconvolution layers. link https://towardsdatascience.com/u-net-b229b32b4a71

In [None]:
import torch
import torch.nn as nn

class conv_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_c)

        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_c)

        self.relu = nn.ReLU()

    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        return x

class encoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.conv = conv_block(in_c, out_c)
        self.pool = nn.MaxPool2d((2, 2))

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)

        return x, p

class decoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.up = nn.ConvTranspose2d(in_c, out_c, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(out_c+out_c, out_c)

    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

class build_unet(nn.Module):
    def __init__(self):
        super().__init__()

        """ Encoder """
        self.e1 = encoder_block(3, 64)
        self.e2 = encoder_block(64, 128)
        self.e3 = encoder_block(128, 256)
        self.e4 = encoder_block(256, 512)

        """ Bottleneck """
        self.b = conv_block(512, 1024)

        """ Decoder """
        self.d1 = decoder_block(1024, 512)
        self.d2 = decoder_block(512, 256)
        self.d3 = decoder_block(256, 128)
        self.d4 = decoder_block(128, 64)

        """ Classifier """
        self.outputs = nn.Conv2d(64, 1, kernel_size=1, padding=0)

    def forward(self, inputs):
        """ Encoder """
        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)

        """ Bottleneck """
        b = self.b(p4)

        """ Decoder """
        d1 = self.d1(b, s4)
        d2 = self.d2(d1, s3)
        d3 = self.d3(d2, s2)
        d4 = self.d4(d3, s1)

        outputs = self.outputs(d4)

        return outputs

if __name__ == "__main__":
    x = torch.randn((2, 3, 512, 512))
    f = build_unet()
    y = f(x)
    print(y.shape)

torch.Size([2, 1, 512, 512])


# trainig Model
Loss calculation in UNet

What kind of loss one would use in such an intrinsic image segmentation? Well, it is defined simply in the paper itself.

    The energy function is computed by a pixel-wise soft-max over the final feature map combined with the Dice-Loss loss function

UNet uses a rather novel loss weighting scheme for each pixel such that there is a higher weight at the border of segmented objects. This loss weighting scheme helped the U-Net model segment eyes diseases in medical images in a discontinuous fashion such that individual cells may be easily identified within the binary segmentation map.

First of all pixel-wise softmax applied on the resultant image which is followed by  Dice-Loss loss function. So we are classifying each pixel into one of the classes. The idea is that even in segmentation every pixel have to lie in some category and we just need to make sure that they do. So we just converted a segmentation problem into a binary-class classification one and it performed very well as compared to the traditional loss functions.

In [None]:
import torch.backends.cudnn as cudnn


**Save the model**

Now, lets save the model, so later we can reload and make predicions without the need to retrain. The model is then converted to JSON format and written to model.json in the local directory. The network weights are written to model.pth in the local directory.

In [None]:
import os
import time
from glob import glob

import torch
from torch.utils.data import DataLoader
import torch.nn as nn

#from data import DriveDataset
#from model import build_unet
#from loss import DiceLoss, DiceBCELoss
#from utils import seeding, create_dir, epoch_time

def train(model, loader, optimizer, loss_fn, device):
    epoch_loss = 0.0

    model.train()
    for x, y in loader:
        x = x.to(device, dtype=torch.float32)
        y = y.to(device, dtype=torch.float32)

        optimizer.zero_grad()
        y_pred = model(x)
        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss = epoch_loss/len(loader)
    return epoch_loss

def evaluate(model, loader, loss_fn, device):
    epoch_loss = 0.0

    model.eval()
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.float32)

            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            epoch_loss += loss.item()

        epoch_loss = epoch_loss/len(loader)
    return epoch_loss

if __name__ == "__main__":
    """ Seeding """
    seeding(42)

    """ Directories """
    create_dir("files")

    """ Load dataset """
    train_x = sorted(glob("/content/drive/My Drive/new_data/train/image/*"))
    train_y = sorted(glob("/content/drive/My Drive/new_data/train/mask/*"))

    valid_x = sorted(glob("/content/drive/My Drive/new_data/test/image/*"))
    valid_y = sorted(glob("/content/drive/My Drive/new_data/test/mask/*"))

    data_str = f"Dataset Size:\nTrain: {len(train_x)} - Valid: {len(valid_x)}\n"
    print(data_str)

    """ Hyperparameters """
    H = 512
    W = 512
    size = (H, W)
    batch_size = 2
    num_epochs = 50
    lr = 1e-4
    checkpoint_path = "/content/drive/My Drive/files/checkpoint.pth"

    """ Dataset and loader """
    train_dataset = DriveDataset(train_x, train_y)
    valid_dataset = DriveDataset(valid_x, valid_y)

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=2
    )

    valid_loader = DataLoader(
        dataset=valid_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=2
    )
   

    device = torch.device('cuda')   ## GTX 1060 6GB
    model = build_unet()
    model = model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, verbose=True)
    loss_fn = DiceBCELoss()

    """ Training the model """
    best_valid_loss = float("inf")

    for epoch in range(num_epochs):
        start_time = time.time()

        train_loss = train(model, train_loader, optimizer, loss_fn, device)
        valid_loss = evaluate(model, valid_loader, loss_fn, device)

        """ Saving the model """
        if valid_loss < best_valid_loss:
            data_str = f"Valid loss improved from {best_valid_loss:2.4f} to {valid_loss:2.4f}. Saving checkpoint: {checkpoint_path}"
            print(data_str)

            best_valid_loss = valid_loss
            torch.save(model.state_dict(), checkpoint_path)

        end_time = time.time()
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        data_str = f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s\n'
        data_str += f'\tTrain Loss: {train_loss:.3f}\n'
        data_str += f'\t Val. Loss: {valid_loss:.3f}\n'
        print(data_str)

Dataset Size:
Train: 20 - Valid: 20

Valid loss improved from inf to 1.4844. Saving checkpoint: /content/drive/My Drive/files/checkpoint.pth
Epoch: 01 | Epoch Time: 0m 31s
	Train Loss: 1.328
	 Val. Loss: 1.484

Valid loss improved from 1.4844 to 1.4416. Saving checkpoint: /content/drive/My Drive/files/checkpoint.pth
Epoch: 02 | Epoch Time: 0m 27s
	Train Loss: 1.174
	 Val. Loss: 1.442

Valid loss improved from 1.4416 to 1.4034. Saving checkpoint: /content/drive/My Drive/files/checkpoint.pth
Epoch: 03 | Epoch Time: 0m 27s
	Train Loss: 1.078
	 Val. Loss: 1.403

Valid loss improved from 1.4034 to 1.3186. Saving checkpoint: /content/drive/My Drive/files/checkpoint.pth
Epoch: 04 | Epoch Time: 0m 28s
	Train Loss: 1.043
	 Val. Loss: 1.319

Valid loss improved from 1.3186 to 1.1644. Saving checkpoint: /content/drive/My Drive/files/checkpoint.pth
Epoch: 05 | Epoch Time: 0m 28s
	Train Loss: 1.011
	 Val. Loss: 1.164

Valid loss improved from 1.1644 to 1.0527. Saving checkpoint: /content/drive/My D

# obtain new Database
Load the model

Now, lets load save the model, so after saved in traning part  we can reload and make predicions without the need to retrain. The model is then converted to JSON format and written to model.json in the local directory. The network weights are written to model.pth in the local directory.
 Look at our files

The model and weight data is loaded from the saved files and a new model is created. It is important to compile the loaded model before it is used. This is so that predictions made using the model can use the appropriate efficient computation from the pytorch backend.

1.   create a new folder call it segemented_Classes
2.   create inside  that folder , each Class folder ( Normal , DiabeticRetinopathy , Myopia , gloumia )
3.    create a new folder call test to put all resized Classes folder resize the images Classes you have from origne dataset  into 512x512 to be compatible with input model Unet and the resized folder with name class
4.   finally step is , change the path file in test_x only for each segement class 



In [None]:
import zipfile


In [None]:
zip_ref = zipfile.ZipFile("/content/drive/My Drive/train_s.zip", 'r')
zip_ref.extractall("/content/drive/My Drive/test")
zip_ref.close()


NameError: ignored

In [None]:
# resize pokeGAN.py
import os
import cv2

src = "/content/drive/My Drive/train/Myopia" #pokeRGB_black
dst = "/content/drive/My Drive/test/class4" # resized

os.mkdir(dst)

for each in os.listdir(src):
    img = cv2.imread(os.path.join(src,each))
    img = cv2.resize(img,(512,512))
    cv2.imwrite(os.path.join(dst,each), img)
    

In [None]:
import os, time
from operator import add
import numpy as np
from glob import glob
import cv2
from tqdm import tqdm
import imageio
import torch
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score

#from model import build_unet
#from utils import create_dir, seeding

def calculate_metrics(y_true, y_pred):
    """ Ground truth """
    y_true = y_true.cpu().numpy()
    y_true = y_true > 0.5
    y_true = y_true.astype(np.uint8)
    y_true = y_true.reshape(-1)

    """ Prediction """
    y_pred = y_pred.cpu().numpy()
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.uint8)
    y_pred = y_pred.reshape(-1)

    score_jaccard = jaccard_score(y_true, y_pred)
    score_f1 = f1_score(y_true, y_pred)
    score_recall = recall_score(y_true, y_pred)
    score_precision = precision_score(y_true, y_pred)
    score_acc = accuracy_score(y_true, y_pred)

    return [score_jaccard, score_f1, score_recall, score_precision, score_acc]

def mask_parse(mask):
    mask = np.expand_dims(mask, axis=-1)    ## (512, 512, 1)
    mask = np.concatenate([mask, mask, mask], axis=-1)  ## (512, 512, 3)
    return mask

if __name__ == "__main__":
    """ Seeding """
    seeding(42)

    """ Folders """
    create_dir("results")
    # important step 
    """ Load dataset and change path test_x when segement a new class just with name of class that have been resized """
    test_x = sorted(glob("/content/drive/My Drive/test/class4/*"))
    test_y = sorted(glob("/content/drive/My Drive/new_data/test/mask/*"))

    """ Hyperparameters """
    H = 512
    W = 512
    size = (W, H)
    checkpoint_path = "/content/drive/My Drive/files/checkpoint.pth"

    """ Load the checkpoint """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    model = build_unet()
    model = model.to(device)
    model.load_state_dict(torch.load(checkpoint_path, map_location=device))
    model.eval()

    metrics_score = [0.0, 0.0, 0.0, 0.0, 0.0]
    time_taken = []

    for i , (y,x)  in tqdm(enumerate(zip(test_x,test_x)), total=len(test_x)):
        """ Extract the name """
        name = x.split("/")[-1].split(".")[0]

        """ Reading image """
        image = cv2.imread(x, cv2.IMREAD_COLOR) ## (512, 512, 3)
        ## image = cv2.resize(image, size)
        x = np.transpose(image, (2, 0, 1))      ## (3, 512, 512)
        x = x/255.0
        x = np.expand_dims(x, axis=0)           ## (1, 3, 512, 512)
        x = x.astype(np.float32)
        x = torch.from_numpy(x)
        x = x.to(device)

        with torch.no_grad():
            """ Prediction and Calculating FPS """
            start_time = time.time()
            pred_y = model(x)
            pred_y = torch.sigmoid(pred_y)
            total_time = time.time() - start_time
            time_taken.append(total_time)

        """ Saving masks """
        #ori_mask = mask_parse(mask)
        pred_y = mask_parse(pred_y)
        line = np.ones((size[1], 10, 3)) * 128
        
        cat_images = np.concatenate(
            [pred_y * 255], axis=-1
        )
        cv2.imwrite(f"/content/drive/My Drive/dataset/segemented_Classes/Myopia/{name}.png", cat_images)

    # jaccard = metrics_score[0]/len(test_x)
    # f1 = metrics_score[1]/len(test_x)
    # recall = metrics_score[2]/len(test_x)
    # precision = metrics_score[3]/len(test_x)
    # acc = metrics_score[4]/len(test_x)
    # print(f"Jaccard: {jaccard:1.4f} - F1: {f1:1.4f} - Recall: {recall:1.4f} - Precision: {precision:1.4f} - Acc: {acc:1.4f}")

    fps = 1/np.mean(time_taken)
    print("FPS: ", fps)

  0%|          | 0/30 [00:00<?, ?it/s]


TypeError: ignored