In [None]:
from google.colab import drive
drive.mount('/content/drive')

**The following command lines will do:**
- Installing the gdown to download things from Google Drive
- Download BID dataset (~7GB) from its Google Drive
- Unzip the downloaded dataset file
- Rename the dataset main folder
- Create two folders to start organize the dataset
- Create a folder for checkpoints files
- Delete the zip file

In [None]:
!pip install gdown
!gdown https://drive.google.com/uc?id=1adk1HM2j-HB0YT7UFdt2_xyr0BEdoshx
!
!unzip /content/BID\ Dataset.zip
!mv /content/BID\ Dataset/ /content/BID-Dataset/
!mkdir /content/BID-Dataset/imgs/ && mkdir /content/BID-Dataset/masks/
!mkdir /content/checkpoints/
!rm /content/BID\ Dataset.zip

**Bellow there are two functions:**

1.   **get_img_list** -> this function receives as argument the directory of subset which contains images and their masks and returns a list containing the all mentioned files (directory+filename).
2.   **organize_dataset** -> this function receives as argument the list of all files and organize all images into the folder "imgs" and all masks into the folder "masks"



In [None]:
from glob import glob
from tqdm import tqdm
from os import rename
from shutil import move

def get_img_list(directory):

    return glob(directory+'**/*.jpg', recursive=True)


def organize_dataset(img_list):

    for img_path in tqdm(img_list):

        split_path = img_path.split("/")
        split_filename = split_path[-1].split("_")

        #deleting the namefile from the path
        del(split_path[-1])
        #deleting the last folder from the path
        del(split_path[-1])

        #joining the path
        new_path = '/'.join(split_path)

        #getting the filename with just the id
        new_filename = split_filename[0] + ".jpg"

        if len(split_filename) == 3:
            #it's a mask!
            #example: "00000000_gt_segmentation.jpg" -> ["00000000", "gt", "segmentation.jpg"]
            new_folder_file = "/masks/" + new_filename
        else:
            #it's a image!
            #example: "00000000_in.jpg" -> ["00000000", "in.jpg"]
            new_folder_file = "/imgs/" + new_filename


        #joining the new folder into the new path string
        new_path += new_folder_file

        move(img_path, new_path)

In [None]:
base_dir = "/content/BID-Dataset/"
subsets = ["CNH_Aberta/", "CNH_Frente/", "CNH_Verso/", "CPF_Frente/",
           "CPF_Verso/", "RG_Aberto/", "RG_Frente/", "RG_Verso/"]

for subset in subsets:
    all_img = get_img_list(base_dir+subset)
    organize_dataset(all_img)

Here we define a function for encoding the masks, it receives as arguments the list path of masks and the threshold for binarization.

In [None]:
# import cv2

# def hot_encoding_mask(img_list, threshold=100):

#     for img_path in tqdm(img_list):

#         img = cv2.imread(img_path)
#         img = (a > threshold).astype(np.int8)
#         cv2.imwrite(img_path, img)

In [None]:
#masks_folder = "masks/"
#masks_list = get_img_list(base_dir+masks_folder)
#hot_encoding_mask(masks_list, 100)

Bellow we have all the code to build the U-Net.

Thanks to the repository: https://github.com/milesial/Pytorch-UNet

In [None]:
""" Parts of the U-Net model """

import torch
import torch.nn as nn
import torch.nn.functional as F


class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            #nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            #nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels, mode='baseline'):
        super().__init__()

        self.mode = mode

        if self.mode == 'scconv':
          self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleSCConv(in_channels, out_channels)
          )
        elif self.mode == 'bottleneck':
          self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleBottleneck(in_channels, out_channels)
          )
        else:
          self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
          )



    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True, mode='baseline'):
        super().__init__()

        self.mode = mode

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

            if self.mode == 'scconv':
                self.conv = DoubleSCConv(in_channels, out_channels, None)
            elif self.mode == 'bottleneck':
                self.conv = DoubleBottleneck(in_channels, out_channels, None)
            else:
                self.conv = DoubleConv(in_channels, out_channels, None)
        else:
            self.up = nn.ConvTranspose2d(in_channels , in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)


    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW

        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

""" Full assembly of the parts to form the complete network """

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True, mode='baseline'):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear
        self.mode = mode

        if self.mode == 'scconv':
          self.inc = DoubleSCConv(n_channels, 16)
        elif self.mode == 'bottleneck':
          self.inc = DoubleBottleneck(n_channels, 16) #-- (3,x,y) ---> (16,x,y)
        else:
          self.inc = DoubleConv(n_channels, 16)

        self.down1 = Down(16, 32, self.mode)
        self.down2 = Down(32, 64, self.mode)
        self.down3 = Down(64, 128, self.mode)
        factor = 2 if bilinear else 1
        #factor = 1

        self.down4 = Down(128, 256 // factor, self.mode)
        self.up1 = Up(256, 128 // factor, bilinear, self.mode)
        self.up2 = Up(128, 64 // factor, bilinear, self.mode)
        self.up3 = Up(64, 32 // factor, bilinear, self.mode)
        self.up4 = Up(32, 16, bilinear, self.mode)
        self.outc = OutConv(16, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits


In [None]:
class SCBottleneck(nn.Module):
    """SCNet SCBottleneck
    """
    expansion = 4
    pooling_r = 4 # down-sampling rate of the avg pooling layer in the K3 path of SC-Conv.

    def __init__(self, inplanes, planes, stride=1, downsample=None,
                 cardinality=1, bottleneck_width=32,
                 avd=False, dilation=1, is_first=False,
                 norm_layer=nn.BatchNorm2d):
        super(SCBottleneck, self).__init__()
        group_width = int(planes * (bottleneck_width / 64.)) * cardinality
        self.conv1_a = nn.Conv2d(inplanes, group_width, kernel_size=1, bias=False)
        self.bn1_a = norm_layer(group_width)
        self.conv1_b = nn.Conv2d(inplanes, group_width, kernel_size=1, bias=False)
        self.bn1_b = norm_layer(group_width)
        self.avd = avd and (stride > 1 or is_first)

        if self.avd:
            self.avd_layer = nn.AvgPool2d(3, stride, padding=1)
            stride = 1

        self.k1 = nn.Sequential(
                    nn.Conv2d(
                        group_width, group_width, kernel_size=3, stride=stride,
                        padding=dilation, dilation=dilation,
                        groups=cardinality, bias=False),
                    norm_layer(group_width),
                    )

        self.scconv = SCConv(group_width, group_width, kernel_size=3, padding=1)

        self.conv3 = nn.Conv2d(
            group_width * 2, planes, kernel_size=1, bias=False)
        self.bn3 = norm_layer(planes)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.dilation = dilation
        self.stride = stride

    def forward(self, x):
        residual = x

        out_a= self.conv1_a(x)
        out_a = self.bn1_a(out_a)

        out_b = self.conv1_b(x)
        out_b = self.bn1_b(out_b)

        out_a = self.relu(out_a)
        out_b = self.relu(out_b)

        out_a = self.k1(out_a)
        out_b = self.scconv(out_b)

        out_a = self.relu(out_a)
        out_b = self.relu(out_b)

        if self.avd:
            out_a = self.avd_layer(out_a)
            out_b = self.avd_layer(out_b)

        out = self.conv3(torch.cat([out_a, out_b], dim=1))
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class DoubleBottleneck(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            #SCConv(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),

            #SCConv(mid_channels, out_channels, kernel_size=3, padding=1),
            SCBottleneck(mid_channels, out_channels, stride=1, downsample=None,
                 cardinality=1, bottleneck_width=32,
                 avd=False, dilation=1, is_first=False,
                 norm_layer=nn.BatchNorm2d),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class DoubleSCConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            SCConv(in_channels, mid_channels, kernel_size=3, padding=1),
            #nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),

            SCConv(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class SCConv(nn.Module):

    def __init__(self, in_channels, mid_channels=None, kernel_size=3, padding=1):
        super(SCConv, self).__init__()
        #print(in_channels,mid_channels)

        self.k2 = nn.Sequential(
                    nn.AvgPool2d(kernel_size=4, stride=4),
                    nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1, dilation=1, stride=1, bias=False, groups=1),
                    nn.BatchNorm2d(in_channels),
                    )
        self.k3 = nn.Sequential(
                    nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1, dilation=1, stride=1, bias=False, groups=1),
                    nn.BatchNorm2d(in_channels),
                    )
        self.k4 = nn.Sequential(
                    nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, dilation=1, stride=1, bias=False, groups=1),
                    nn.BatchNorm2d(mid_channels),
                    )

    def forward(self, x):
        identity = x

        out = torch.sigmoid(torch.add(identity, F.interpolate(self.k2(x), identity.size()[2:]))) # sigmoid(identity + k2)
        out = torch.mul(self.k3(x), out) # k3 * sigmoid(identity + k2)
        out = self.k4(out) # k4

        return out

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
from torchsummary import summary
net = UNet(n_channels=1, n_classes=1, bilinear=True, mode='scconv')
net.to(device=device)
#print(net)
summary(net, (1, 512, 512))

Bellow we have all the code to build the dataset to be used in our experiments.

Thanks to the repository: https://github.com/milesial/Pytorch-UNet

In [None]:
from torch.utils.data import Dataset
from os import listdir
from os.path import splitext
from PIL import Image
import numpy as np
import cv2

class BasicDataset(Dataset):
    def __init__(self, imgs_dir, masks_dir, scale=1, mask_suffix=''):
        self.imgs_dir = imgs_dir
        self.masks_dir = masks_dir
        self.scale = scale
        self.mask_suffix = mask_suffix
        assert 0 < scale <= 1, 'Scale must be between 0 and 1'

        self.ids = [splitext(file)[0] for file in listdir(imgs_dir)
                    if not file.startswith('.')]
        logging.info(f'Creating dataset with {len(self.ids)} examples')

    def __len__(self):
        return len(self.ids)

    @classmethod
    def preprocess(cls, pil_img, scale, mask_flag=False):

        w, h = pil_img.size
        newW, newH = int(scale * w), int(scale * h)
        assert newW > 0 and newH > 0, 'Scale is too small'
        pil_img = pil_img.resize((newW, newH))

        img_nd = np.array(pil_img)
        img_nd = cv2.resize(img_nd, (512,512), interpolation = cv2.INTER_AREA)
        #img_nd = cv2.resize(img_nd, (506,506), interpolation = cv2.INTER_AREA)

        if len(img_nd.shape) == 2:
            img_nd = np.expand_dims(img_nd, axis=2)

        if mask_flag:
            msk = (img_nd > 100).astype(np.uint8)
            return msk.transpose((2, 0, 1))

        # HWC to CHW
        img_trans = img_nd.transpose((2, 0, 1))
        if img_trans.max() > 1:
            img_trans = img_trans / 255

        return img_trans

    def __getitem__(self, i):
        idx = self.ids[i]
        mask_file = glob(self.masks_dir + idx + self.mask_suffix + '.*')
        img_file = glob(self.imgs_dir + idx + '.*')

        assert len(mask_file) == 1, \
            f'Either no mask or multiple masks found for the ID {idx}: {mask_file}'
        assert len(img_file) == 1, \
            f'Either no image or multiple images found for the ID {idx}: {img_file}'

        #these two lines bellow are for image opening
        mask = Image.open(mask_file[0])
        img = Image.open(img_file[0])

        #grayscale image adaptation here
        red, green, blue = img.split()
        img = green

        red_msk, green_msk, blue_msk = mask.split()
        mask = green_msk


        assert img.size == mask.size, \
            f'Image and mask {idx} should be the same size, but are {img.size} and {mask.size}'

        img = self.preprocess(img, self.scale, mask_flag = False)
        mask = self.preprocess(mask, self.scale, mask_flag = True)

        #print(np.unique(mask))

        return {
            'image': torch.from_numpy(img).type(torch.FloatTensor),
            'mask': torch.from_numpy(mask).type(torch.FloatTensor)
        }


class CarvanaDataset(BasicDataset):
    def __init__(self, imgs_dir, masks_dir, scale=1):
        super().__init__(imgs_dir, masks_dir, scale, mask_suffix='_mask')

Bellow there is a code for evaluate our model.
Thanks to the repository (modified): https://github.com/milesial/Pytorch-UNet

In [None]:
from torch.autograd import Function
from sklearn.metrics import jaccard_similarity_score as iou


def transform(tensor):
    return tensor.cpu().numpy().flatten()


def evaluation_v2(net, loader, device):
    """Evaluation without the densecrf with the dice coefficient"""
    net.eval()
    mask_type = torch.float32 if net.n_classes == 1 else torch.long
    n_val = len(loader)  # the number of batch
    tot_iou = 0
    tot_dice = 0

    #for batch in tqdm(loader):
    print("evaluating...")
    for batch in tqdm(loader):
        imgs, true_masks = batch['image'], batch['mask']
        imgs = imgs.to(device=device, dtype=torch.float32)
        true_masks = true_masks.to(device=device, dtype=mask_type)

        with torch.no_grad():
            mask_pred = net(imgs)

        if net.n_classes > 1:
            tot += F.cross_entropy(mask_pred, true_masks).item()
        else:
            pred = torch.sigmoid(mask_pred)
            pred = (pred > 0.5).float()

            tot_dice += dice_coeff(pred, true_masks).item()
            tot_iou += iou(transform(pred), transform(true_masks))

    net.train()
    return (tot_dice / n_val) , (tot_iou / n_val)

Bellow we have all the code to make the training process with the proposed U-Net in the dataset organized.

Thanks to the repository: https://github.com/milesial/Pytorch-UNet

In [None]:
from torch.utils.data import DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from torch import optim
import torch
import torch.nn.functional as F
from tqdm import tqdm
import os
import shutil

import torch
from torch.autograd import Function

from google.colab import files

def train_net(net,
              device,
              epochs=5,
              batch_size=1,
              lr=0.001,
              val_percent=0.1,
              save_cp=True,
              img_scale=0.5,
              dir_checkpoint="",
              mode = ''):

    dataset = BasicDataset(dir_img, dir_mask, img_scale)
    n_val = int(len(dataset) * val_percent)
    n_train = len(dataset) - n_val

    torch.manual_seed(0)
    train, val = random_split(dataset, [n_train, n_val])
    train_loader = DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=8, pin_memory=True)
    val_loader = DataLoader(val, batch_size=batch_size, shuffle=False, num_workers=8, pin_memory=True, drop_last=True)

    writer = SummaryWriter(comment=f'LR_{lr}_BS_{batch_size}_SCALE_{img_scale}')
    global_step = 0

    logging.info(f'''Starting training:
        Epochs:          {epochs}
        Batch size:      {batch_size}
        Learning rate:   {lr}
        Training size:   {n_train}
        Validation size: {n_val}
        Checkpoints:     {save_cp}
        Device:          {device.type}
        Images scaling:  {img_scale}
    ''')

    optimizer = optim.RMSprop(net.parameters(), lr=lr, weight_decay=1e-8, momentum=0.9)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min' if net.n_classes > 1 else 'max', patience=2)
    if net.n_classes > 1:
        criterion = nn.CrossEntropyLoss()
    else:
        criterion = nn.BCEWithLogitsLoss()

    for epoch in range(epochs):
        net.train()

        epoch_loss = 0
        with tqdm(total=n_train, desc=f'Epoch {epoch + 1}/{epochs}', unit='img') as pbar:
            for batch in train_loader:
                imgs = batch['image']
                true_masks = batch['mask']
                assert imgs.shape[1] == net.n_channels, \
                    f'Network has been defined with {net.n_channels} input channels, ' \
                    f'but loaded images have {imgs.shape[1]} channels. Please check that ' \
                    'the images are loaded correctly.'

                imgs = imgs.to(device=device, dtype=torch.float32)
                mask_type = torch.float32 if net.n_classes == 1 else torch.long
                true_masks = true_masks.to(device=device, dtype=mask_type)

                masks_pred = net(imgs)
                loss = criterion(masks_pred, true_masks)
                epoch_loss += loss.item()
                writer.add_scalar('Loss/train', loss.item(), global_step)

                pbar.set_postfix(**{'loss (batch)': loss.item()})

                optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_value_(net.parameters(), 0.1)
                optimizer.step()

                pbar.update(imgs.shape[0])
                global_step += 1
                '''
                if global_step % (n_train // (50 * batch_size)) == 0:
                    for tag, value in net.named_parameters():
                        tag = tag.replace('.', '/')
                        writer.add_histogram('weights/' + tag, value.data.cpu().numpy(), global_step)
                        writer.add_histogram('grads/' + tag, value.grad.data.cpu().numpy(), global_step)
                    val_score = eval_net(net, val_loader, device)
                    print("Validation result is:")


                    scheduler.step(val_score)
                    writer.add_scalar('learning_rate', optimizer.param_groups[0]['lr'], global_step)

                    if net.n_classes > 1:
                        logging.info('Validation cross entropy: {}'.format(val_score))
                        writer.add_scalar('Loss/test', val_score, global_step)
                    else:
                        logging.info('Validation Dice Coeff: {}'.format(val_score))
                        writer.add_scalar('Dice/test', val_score, global_step)

                    writer.add_images('images', imgs, global_step)
                    if net.n_classes == 1:
                        writer.add_images('masks/true', true_masks, global_step)
                        writer.add_images('masks/pred', torch.sigmoid(masks_pred) > 0.5, global_step)
                '''

        #descomentar esses caras abaixo quando tiver tudo ok
        #val_score = eval_net(net, val_loader, device)
        #print("Validation result is:")
        #print(val_score)
        dice, iou = evaluation_v2(net, val_loader, device)
        print("Validation result is:")
        print(dice, iou)

        if save_cp:
            try:
                os.mkdir(dir_checkpoint)
                logging.info('Created checkpoint directory')
            except OSError:
                pass

            fname_cp = mode + '_' + f'CP_epoch{epoch + 1}.pth'

            cp_file = dir_checkpoint + fname_cp
            torch.save(net.state_dict(), cp_file)
            print("checkpoint saved!")

            destination = '/content/drive/MyDrive/Projeto-DL/' + fname_cp
            shutil.move(cp_file, destination)
            print("checkpoint downloaded!")



    writer.close()

    return net, train_loader, val_loader

In [None]:
from torch.autograd import Function

from sklearn.metrics import jaccard_similarity_score as iou

class DiceCoeff(Function):
    """Dice coeff for individual examples"""

    def forward(self, input, target):
        self.save_for_backward(input, target)
        eps = 0.0001
        self.inter = torch.dot(input.view(-1), target.view(-1))
        self.union = torch.sum(input) + torch.sum(target) + eps

        t = (2 * self.inter.float() + eps) / self.union.float()
        return t

    # This function has only a single output, so it gets only one gradient
    def backward(self, grad_output):

        input, target = self.saved_variables
        grad_input = grad_target = None

        if self.needs_input_grad[0]:
            grad_input = grad_output * 2 * (target * self.union - self.inter) \
                         / (self.union * self.union)
        if self.needs_input_grad[1]:
            grad_target = None

        return grad_input, grad_target


def dice_coeff(input, target):
    """Dice coeff for batches"""
    if input.is_cuda:
        s = torch.FloatTensor(1).cuda().zero_()
    else:
        s = torch.FloatTensor(1).zero_()

    for i, c in enumerate(zip(input, target)):
        s = s + DiceCoeff().forward(c[0], c[1])

    return s / (i + 1)

def iou_pytorch(outputs: torch.Tensor, labels: torch.Tensor):
    # You can comment out this line if you are passing tensors of equal shape
    # But if you are passing output from UNet or something it will most probably
    # be with the BATCH x 1 x H x W shape
    outputs = outputs.squeeze(1)  # BATCH x 1 x H x W => BATCH x H x W

    intersection = (outputs & labels).float().sum((1, 2))  # Will be zero if Truth=0 or Prediction=0
    union = (outputs | labels).float().sum((1, 2))         # Will be zzero if both are 0

    iou = (intersection + SMOOTH) / (union + SMOOTH)  # We smooth our devision to avoid 0/0

    thresholded = torch.clamp(20 * (iou - 0.5), 0, 10).ceil() / 10  # This is equal to comparing with thresolds

    return thresholded

def eval_net(net, loader, device):
    """Evaluation without the densecrf with the dice coefficient"""
    net.eval()
    mask_type = torch.float32 if net.n_classes == 1 else torch.long
    n_val = len(loader)  # the number of batch
    tot = 0

    #for batch in tqdm(loader):
    print("evaluating...")
    for batch in loader:
        imgs, true_masks = batch['image'], batch['mask']
        imgs = imgs.to(device=device, dtype=torch.float32)
        true_masks = true_masks.to(device=device, dtype=mask_type)

        with torch.no_grad():
            mask_pred = net(imgs)

        if net.n_classes > 1:
            tot += F.cross_entropy(mask_pred, true_masks).item()
        else:
            pred = torch.sigmoid(mask_pred)
            pred = (pred > 0.5).float()
            #tot += dice_coeff(pred, true_masks).item()
            iou_actual = iou_pytorch(pred, true_masks)
            tot += iou_actual
            print(iou_actual)

    net.train()
    return tot / n_val

Bellow we have all the code to set all necessary variables to make the training:

In [None]:
import logging

epochs=5
batch_size=5
learning_rate=0.0001
load=False
scale=1
validation=25.0

mode='scconv'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logging.info(f'Using device {device}')

#net = UNet(n_channels=1, n_classes=1, bilinear=True)
#net = UNet(n_channels=1, n_classes=1, bilinear=True, useSCConv=True)
net = UNet(n_channels=1, n_classes=1, bilinear=True, mode=mode)
net.to(device=device)

dir_img = '/content/BID-Dataset/imgs/'
dir_mask = '/content/BID-Dataset/masks/'
dir_checkpoint = '/content/checkpoints/'

In [None]:
net, data_train, data_val = train_net(net=net, epochs=100,
                              batch_size=batch_size,
                              lr=learning_rate, device=device,
                              img_scale=scale,
                              val_percent=validation / 100,
                              dir_checkpoint = dir_checkpoint,
                              mode = mode)

Code for prediction of images!

In [None]:
from PIL import Image
from matplotlib.pyplot import imshow
import matplotlib.pyplot as plt
from torchvision import transforms
import numpy as np
import cv2

def imshow_v2(img, binary=False):

  if binary:
    img = 1- img

  plt.subplot()
  plt.imshow(img, cmap='Greys',  interpolation='nearest')

  plt.show()

def get_net(cp_model, mode):

    net = UNet(n_channels=1, n_classes=1, bilinear=True, mode=mode)
    net.to(device=device)

    net.load_state_dict(torch.load(cp_model, map_location=device))

    return net

def open_image(img_file, split = True):

    img = Image.open(img_file)
    red, green, blue = img.split()

    return img, green

def predict_img(net,
                full_img,
                device,
                scale_factor=1,
                out_threshold=0.5):
    net.eval()

    img = torch.from_numpy(BasicDataset.preprocess(full_img, scale_factor))

    img = img.unsqueeze(0)
    img = img.to(device=device, dtype=torch.float32)

    with torch.no_grad():
        output = net(img)

        if net.n_classes > 1:
            probs = F.softmax(output, dim=1)
        else:
            probs = torch.sigmoid(output)

        probs = probs.squeeze(0)

        tf = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize(full_img.size[1]),
                transforms.ToTensor()
            ]
        )

        probs = tf(probs.cpu())
        full_mask = probs.squeeze().cpu().numpy()

    return (full_mask > out_threshold).astype(int)

# Nova seção

In [None]:
#tem que fazer o upload (caso não tenha treinado) do arquivo dos pesos
cp_model = dir_checkpoint + "CP_epoch5.pth"
#net = (cp_model, "scconv")
#print(net)

In [None]:
net = UNet(n_channels=1, n_classes=1, bilinear=True, mode="botleneck")
net.to(device=device)
net.load_state_dict(torch.load(cp_model, map_location=device))

In [None]:
f = "00020001.jpg"
img_file = dir_img + f
original_mask = dir_mask + f

imgrgb, img = open_image(img_file)


mask = predict_img(net=net, full_img=img,
                  scale_factor=0.5,
                  out_threshold=0.5,
                  device=device)

img = img.resize((mask.shape[0],mask.shape[1]))
imgrgb = imgrgb.resize((mask.shape[0],mask.shape[1]))

In [None]:
imshow_v2(imgrgb)

In [None]:
imshow_v2(mask, binary = True)