In [None]:
!pip install albumentations==0.4.6
!pip install ternausnet > /dev/null
!pip install segmentation-models-pytorch

Collecting albumentations==0.4.6
  Downloading albumentations-0.4.6.tar.gz (117 kB)
[K     |████████████████████████████████| 117 kB 4.3 MB/s 
Collecting imgaug>=0.4.0
  Downloading imgaug-0.4.0-py2.py3-none-any.whl (948 kB)
[K     |████████████████████████████████| 948 kB 14.4 MB/s 
Building wheels for collected packages: albumentations
  Building wheel for albumentations (setup.py) ... [?25l[?25hdone
  Created wheel for albumentations: filename=albumentations-0.4.6-py3-none-any.whl size=65172 sha256=94bb55d62accbc60bcc2e04f72e69a1ef02ac2e97bc16a8e4b994185cb5d10e2
  Stored in directory: /root/.cache/pip/wheels/cf/34/0f/cb2a5f93561a181a4bcc84847ad6aaceea8b5a3127469616cc
Successfully built albumentations
Installing collected packages: imgaug, albumentations
  Attempting uninstall: imgaug
    Found existing installation: imgaug 0.2.9
    Uninstalling imgaug-0.2.9:
      Successfully uninstalled imgaug-0.2.9
  Attempting uninstall: albumentations
    Found existing installation: album

In [None]:
from collections import defaultdict
import copy
import random
import os
import zipfile
import albumentations
import albumentations as A
import albumentations.augmentations.functional as F
import albumentations.augmentations.transforms as G
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import numpy as np
import ternausnet.models
from tqdm import tqdm
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from google.colab import drive
import segmentation_models_pytorch as smp

cudnn.benchmark = True

In [None]:
drive.mount('/content/gdrive')
IMAGES_DIR = './images'
MASKS_DIR = './masks'

Mounted at /content/gdrive


In [None]:
DATASET_PATH = '/content/gdrive/My Drive/tomats.zip'
zip_object = zipfile.ZipFile(file = DATASET_PATH,mode = 'r')
zip_object.extractall(IMAGES_DIR)
zip_object.close

<bound method ZipFile.close of <zipfile.ZipFile filename='/content/gdrive/My Drive/tomats.zip' mode='r'>>

In [None]:
from pathlib import Path
from multiprocessing import Pool
from tqdm import tqdm


if not os.path.isdir(MASKS_DIR):
  os.mkdir(MASKS_DIR)

def make_mask(filename, IMAGES_DIR=IMAGES_DIR, MASKS_DIR=MASKS_DIR):
  orginal_img = cv2.imread(os.path.join(IMAGES_DIR, filename))
  img = cv2.cvtColor(np.asarray(orginal_img), cv2.COLOR_BGR2HSV)
  mask1 = cv2.inRange(img, (0, 20, 20), (80, 255, 255))
  mask2 = cv2.inRange(img, (170, 50, 20), (180, 255, 255))
  mask = cv2.bitwise_or(mask1, mask2)
  kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (10, 10))
  mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=1)
  mask = mask/255
  cv2.imwrite(os.path.join(MASKS_DIR, filename.replace(".JPG", ".png")), mask)


filenames = [filename for filename in os.listdir(IMAGES_DIR) if '.JPG' in filename]

with Pool(10) as p:
    list(tqdm(p.imap(make_mask, filenames), "Making masks", total=len(filenames)))

Making masks: 100%|██████████| 657/657 [05:37<00:00,  1.95it/s]


In [None]:
def preprocess_mask(mask):
    mask = mask.astype(np.float32)
    mask[mask == 2.0] = 0.0
    mask[(mask == 1.0) | (mask == 3.0)] = 1.0
    return mask

### Define a function to visualize images and their labels

In [None]:
def display_image_grid(images_filenames, images_directory, masks_directory, predicted_masks=None):
    cols = 3 if predicted_masks else 2
    rows = len(images_filenames)
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 24))
    for i, image_filename in enumerate(images_filenames):
        image = cv2.imread(os.path.join(images_directory, image_filename))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = cv2.imread(os.path.join(masks_directory, image_filename.replace(".JPG", ".png")), cv2.IMREAD_UNCHANGED,)
        mask = preprocess_mask(mask)
        ax[i, 0].imshow(image)
        ax[i, 1].imshow(mask, interpolation="nearest")

        ax[i, 0].set_title("Image")
        ax[i, 1].set_title("Ground truth mask")

        ax[i, 0].set_axis_off()
        ax[i, 1].set_axis_off()

        if predicted_masks:
            predicted_mask = predicted_masks[i]
            ax[i, 2].imshow(predicted_mask, interpolation="nearest")
            ax[i, 2].set_title("Predicted mask")
            ax[i, 2].set_axis_off()
    plt.tight_layout()
    plt.show()

In [None]:
display_image_grid(filenames[0:4], IMAGES_DIR, MASKS_DIR)

In [None]:
from sklearn.model_selection import train_test_split

filenames = sorted(filenames)

random.seed(42)
random.shuffle(filenames)

train_images_filenames, val_images_filenames, y_train, y_val = train_test_split(filenames,
                                                    filenames,
                                                    test_size = 0.1,
                                                    random_state = 42)

test_images_filenames = val_images_filenames[:15]

print(len(train_images_filenames), len(val_images_filenames), len(test_images_filenames))

591 66 15


### Define a PyTorch dataset class

In [None]:
class TomatosDataset(Dataset):
    def __init__(self, images_filenames, images_directory, masks_directory, transform=None):
        self.images_filenames = images_filenames
        self.images_directory = images_directory
        self.masks_directory = masks_directory
        self.transform = transform

    def __len__(self):
        return len(self.images_filenames)

    def __getitem__(self, idx):
        image_filename = self.images_filenames[idx]
        image = cv2.imread(os.path.join(self.images_directory, image_filename))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(os.path.join(self.masks_directory, image_filename.replace(".JPG", ".png")), cv2.IMREAD_UNCHANGED)
        mask = preprocess_mask(mask)
        if self.transform is not None:
            transformed = self.transform(image=image, mask=mask)
            image = transformed["image"]
            mask = transformed["mask"]
        return image, mask

In [None]:
train_transform = A.Compose(
    [
        A.Resize(256, 256),
        A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=0.5),
        A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

# train_transform = transforms.Compose([A.Resize(256, 256), transforms.ToTensor()])

train_dataset = TomatosDataset(train_images_filenames, IMAGES_DIR, MASKS_DIR, transform=train_transform,)

val_transform = A.Compose(
    [A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
val_dataset = TomatosDataset(val_images_filenames, IMAGES_DIR, MASKS_DIR, transform=val_transform,)

Let's define a function that takes a dataset and visualizes different augmentations applied to the same image and the associated mask.

In [None]:
def visualize_augmentations(dataset, idx=0, samples=5):
    dataset = copy.deepcopy(dataset)
    dataset.transform = A.Compose([t for t in dataset.transform if not isinstance(t, (A.Normalize, ToTensorV2))])
    figure, ax = plt.subplots(nrows=samples, ncols=2, figsize=(10, 24))
    for i in range(samples):
        image, mask = dataset[idx]
        ax[i, 0].imshow(image)
        ax[i, 1].imshow(mask, interpolation="nearest")
        ax[i, 0].set_title("Augmented image")
        ax[i, 1].set_title("Augmented mask")
        ax[i, 0].set_axis_off()
        ax[i, 1].set_axis_off()
    plt.tight_layout()
    plt.show()

In [None]:
random.seed(42)
visualize_augmentations(train_dataset, idx=7)

### Define helpers for training

In [None]:
import torch.nn.functional as FF

class DiceLossCustom(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLossCustom, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        
        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = FF.sigmoid(inputs)       
        
        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        
        intersection = (inputs * targets).sum()                            
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)  
        
        return 1 - dice

In [None]:
class MetricMonitor:
    def __init__(self, float_precision=4):
        self.float_precision = float_precision
        self.reset()

    def reset(self):
        self.metrics = defaultdict(lambda: {"val": 0, "count": 0, "avg": 0})

    def update(self, metric_name, val):
        metric = self.metrics[metric_name]
        
        metric["val"] += val
        metric["count"] += 1
        metric["avg"] = metric["val"] / metric["count"]

    def get_value(self, metric_name):
        return self.metrics[metric_name]["avg"]

    def __str__(self):
        return " | ".join(
            [
                "{metric_name}: {avg:.{float_precision}f}".format(
                    metric_name=metric_name, avg=metric["avg"], float_precision=self.float_precision
                )
                for (metric_name, metric) in self.metrics.items()
            ]
        )

### Define functions for training and validation

In [None]:
def train(train_loader, model, criterion, dice_criterion, optimizer, epoch, params):
    metric_monitor = MetricMonitor()
    model.train()
    stream = tqdm(train_loader)
    for i, (images, target) in enumerate(stream, start=1):
        images = images.to(params["device"], non_blocking=True)
        target = target.to(params["device"], non_blocking=True)
        output = model(images).squeeze(1)

        loss = criterion(output, target)
        metric_monitor.update("Loss", loss.item())

        dice = dice_criterion(output, target)
        metric_monitor.update("DiceLoss", dice.item())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        stream.set_description(
            "Epoch: {epoch}. Train.      {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
        )

In [None]:
def validate(val_loader, model, criterion, dice_criterion, epoch, params):
    metric_monitor = MetricMonitor()
    model.eval()
    stream = tqdm(val_loader)
    with torch.no_grad():
        for i, (images, target) in enumerate(stream, start=1):
            images = images.to(params["device"], non_blocking=True)
            target = target.to(params["device"], non_blocking=True)
            output = model(images).squeeze(1)

            loss = criterion(output, target)
            metric_monitor.update("Loss", loss.item())

            dice = dice_criterion(output, target)
            metric_monitor.update("DiceLoss", dice.item())

            stream.set_description(
                "Epoch: {epoch}. Validation. {metric_monitor}".format(epoch=epoch, metric_monitor=metric_monitor)
            )
    return metric_monitor.get_value('Loss')

In [None]:
def create_model(params):
    model = getattr(ternausnet.models, params["model"])(pretrained=True)
    model = model.to(params["device"])
    return model

In [None]:
def train_and_validate(model, train_dataset, val_dataset, params):
    train_loader = DataLoader(
        train_dataset,
        batch_size=params["batch_size"],
        shuffle=True,
        num_workers=params["num_workers"],
        pin_memory=True,
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=params["batch_size"],
        shuffle=False,
        num_workers=params["num_workers"],
        pin_memory=True,
    )
    criterion = nn.BCEWithLogitsLoss().to(params["device"])
    dice_criterion = DiceLossCustom()
    optimizer = torch.optim.Adam(model.parameters(), lr=params["lr"])
    min_loss = 10000
    for epoch in range(1, params["epochs"] + 1):
        train(train_loader, model, criterion, dice_criterion, optimizer, epoch, params)
        loss = validate(val_loader, model, criterion, dice_criterion, epoch, params)

        if min_loss > loss:
            torch.save(model, './best_model.pth')
            print(f'Model saved! Now loss={loss}')
            min_loss = loss


    return model

In [None]:
def predict(model, params, test_dataset, batch_size):
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False, num_workers=params["num_workers"], pin_memory=True,
    )
    model.eval()
    predictions = []
    with torch.no_grad():
        for images, (original_heights, original_widths) in test_loader:
            images = images.to(params["device"], non_blocking=True)
            output = model(images)
            probabilities = torch.sigmoid(output.squeeze(1))
            predicted_masks = (probabilities >= 0.5).float() * 1
            predicted_masks = predicted_masks.cpu().numpy()
            for predicted_mask, original_height, original_width in zip(
                predicted_masks, original_heights.numpy(), original_widths.numpy()
            ):
                predictions.append((predicted_mask, original_height, original_width))
    return predictions

### Define training parameters 

Here we define a few training parameters such as model architecture, learning rate, batch size, epochs, etc.

In [None]:
params = {
    "model": "UNet11",
    "device": "cuda",
    "lr": 0.001,
    "batch_size": 6,
    "num_workers": 2,
    "epochs": 10,
}

### Train a model

In [None]:
print(train_dataset[0][0].shape)
print(type(train_dataset[0][0]))
print(np.unique(train_dataset[0][1]))
print(train_dataset[0][1].shape)

In [None]:
model = create_model(params)
# model = torch.load('./best_model.pth')
model = train_and_validate(model, train_dataset, val_dataset, params)

### Predict labels for images and visualize those predictions

Now we have a trained model, so let's try to predict masks for some images. Note that the `__getitem__` method returns not only an image but also the original height and width of an image. We will use those values to resize a predicted mask from the size of 256x256 pixels to the original image's size.

In [None]:
class TomatosInferenceDataset(Dataset):
    def __init__(self, images_filenames, images_directory, transform=None):
        self.images_filenames = images_filenames
        self.images_directory = images_directory
        self.transform = transform

    def __len__(self):
        return len(self.images_filenames)

    def __getitem__(self, idx):
        image_filename = self.images_filenames[idx]
        image = cv2.imread(os.path.join(self.images_directory, image_filename))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        original_size = tuple(image.shape[:2])
        if self.transform is not None:
            transformed = self.transform(image=image)
            image = transformed["image"]
        return image, original_size

In [None]:
test_transform = A.Compose(
    [A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
test_dataset = TomatosInferenceDataset(test_images_filenames, IMAGES_DIR, transform=test_transform,)

In [None]:
predictions = predict(model, params, test_dataset, batch_size=1)

In [None]:
predicted_masks = []
for predicted_256x256_mask, original_height, original_width in predictions:
    full_sized_mask = F.resize(
        predicted_256x256_mask, height=original_height, width=original_width, interpolation=cv2.INTER_NEAREST
    )
    predicted_masks.append(full_sized_mask)

In [None]:
# display_image_grid(test_images_filenames, IMAGES_DIR, MASKS_DIR, predicted_masks=predicted_masks)

In [None]:
def visualize_results(predicted_masks, test_images_filenames=test_images_filenames, samples=3, IMAGES_DIR=IMAGES_DIR, MASKS_DIR=MASKS_DIR):
  figure, ax = plt.subplots(nrows=len(predicted_masks[:samples]), ncols=3, figsize=(80, 80))
  for i in range(len(predicted_masks[:samples])):
    print(os.path.join(IMAGES_DIR, test_images_filenames[i]))
    print(os.path.join(MASKS_DIR, test_images_filenames[i].replace(".JPG", ".png")))
    img = cv2.imread(os.path.join(IMAGES_DIR, test_images_filenames[i]))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    real_mask = cv2.imread(os.path.join(MASKS_DIR, test_images_filenames[i].replace(".JPG", ".png")))
    ax[i, 0].imshow(img)
    ax[i, 1].imshow(real_mask*255, interpolation="nearest")
    ax[i, 2].imshow(predicted_masks[i], interpolation="nearest")
    ax[i, 0].set_title("image", fontsize=100)
    ax[i, 1].set_title("real mask", fontsize=100)
    ax[i, 2].set_title("predicted mask", fontsize=100)
    ax[i, 0].set_axis_off()
    ax[i, 1].set_axis_off()
    ax[i, 2].set_axis_off()
  plt.tight_layout()
  plt.show()

In [None]:
def visualize_contours(predicted_masks, test_images_filenames=test_images_filenames, samples=3, IMAGES_DIR=IMAGES_DIR, MASKS_DIR=MASKS_DIR):
  assert len(test_images_filenames)==len(predicted_masks)
  figure, ax = plt.subplots(nrows=len(predicted_masks[:samples]), ncols=2, figsize=(80, 80))
  for i in range(len(predicted_masks[:samples])):
    img = cv2.imread(os.path.join(IMAGES_DIR, test_images_filenames[i]))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_for_realmask = img.copy()
    img_for_predmask = img.copy()
    real_mask = cv2.imread(os.path.join(MASKS_DIR, test_images_filenames[i].replace(".JPG", ".png")), cv2.IMREAD_GRAYSCALE)
    real_contours, hierarchy = cv2.findContours(image=np.uint8(real_mask * 255), mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE)
    pred_contours, hierarchy = cv2.findContours(image=np.uint8(predicted_masks[i] * 255), mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE)
    cv2.drawContours(image=img_for_realmask, contours=real_contours, contourIdx=-1, color=(0, 255, 0), thickness=16, lineType=cv2.LINE_AA)
    cv2.drawContours(image=img_for_predmask, contours=pred_contours, contourIdx=-1, color=(0, 255, 0), thickness=16, lineType=cv2.LINE_AA)
    ax[i, 0].imshow(img_for_realmask, interpolation="nearest")
    ax[i, 1].imshow(img_for_predmask, interpolation="nearest")
    ax[i, 0].set_title("image with real contours", fontsize=100)
    ax[i, 1].set_title("image with predicted contours", fontsize=100)
    ax[i, 0].set_axis_off()
    ax[i, 1].set_axis_off()
  plt.tight_layout()
  plt.show()


# assert len(filenames)==len(predicted_masks)

# for i in range(len(filenames[:1])):
#   img = cv2.imread(os.path.join(BAD_DIR, filenames[i]))
#   img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
#   mask = predicted_masks[i]
#   contours, hierarchy = cv2.findContours(image=np.uint8(mask * 255), mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE)
#   cv2.drawContours(image=img, contours=contours, contourIdx=-1, color=(0, 255, 0), thickness=2, lineType=cv2.LINE_AA)
#   plt.imshow(img)
#   plt.subplot()

In [None]:
visualize_results(predicted_masks)
  # # contours, hierarchy = cv2.findContours(predicted_masks[i], cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
  # # _, binarized = cv2.threshold(predicted_masks[i], 125, 255, cv2.THRESH_BINARY)
  # # _, contours, hierarchy = cv2.findContours(binarized, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
  # border = cv2.copyMakeBorder(predicted_masks[i], 1, 1, 1, 1, cv2.BORDER_CONSTANT, value=0 )
  # _, contours, hierarchy = cv2.findContours(border, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE, offset=(-1, -1))

  # cv2.drawContours(img, contours, -1, (0, 255, 0), 3)
  # plt.imshow(img)
  # plt.figure()

In [None]:
BAD_DIR = './bad'
BAD_MASK_DIR = './bad_mask'

DATASET_PATH = '/content/gdrive/My Drive/bad_tomats.zip'
zip_object = zipfile.ZipFile(file = DATASET_PATH,mode = 'r')
zip_object.extractall(BAD_DIR)
zip_object.close

<bound method ZipFile.close of <zipfile.ZipFile filename='/content/gdrive/My Drive/bad_tomats.zip' mode='r'>>

In [None]:
from functools import partial

if not os.path.isdir(BAD_MASK_DIR):
  os.mkdir(BAD_MASK_DIR)

filenames = os.listdir(BAD_DIR)

with Pool(10) as p:
  list(tqdm(p.imap(partial(make_mask, IMAGES_DIR=BAD_DIR, MASKS_DIR=BAD_MASK_DIR), filenames), "Making masks", total=len(filenames)))


Making masks: 100%|██████████| 6/6 [00:03<00:00,  1.68it/s]


In [None]:
test_transform = A.Compose(
    [A.Resize(256, 256), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()]
)
test_dataset = TomatosInferenceDataset(filenames, BAD_DIR, transform=test_transform,)
predictions = predict(model, params, test_dataset, batch_size=1)
predicted_masks = []
for predicted_256x256_mask, original_height, original_width in predictions:
    full_sized_mask = F.resize(
        predicted_256x256_mask, height=original_height, width=original_width, interpolation=cv2.INTER_NEAREST
    )
    predicted_masks.append(full_sized_mask)
visualize_results(predicted_masks, test_images_filenames=filenames, samples=6, IMAGES_DIR=BAD_DIR, MASKS_DIR=BAD_MASK_DIR)

In [None]:
torch.save(model, '/content/gdrive/MyDrive/best_model.pth')

In [None]:
model = torch.load('/content/gdrive/MyDrive/best_model.pth')

In [None]:
visualize_contours(predicted_masks, test_images_filenames=filenames, samples=6, IMAGES_DIR=BAD_DIR, MASKS_DIR=BAD_MASK_DIR)