In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when 
# you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import warnings
warnings.filterwarnings("ignore")

# HW Lesson 5

1. Необходимо подготовить датасет https://www.kaggle.com/olekslu/makeup-lips-segmentation-28k-samples для обучения модели на сегментацию губ

2. Обучить модель на выбор из [segmentation_models_pytorch](https://segmentation modelspytorch.readthedocs.io/en/latest/index.html)

In [3]:
! pip install --quiet segmentation_models_pytorch

In [4]:
!pip install --quiet torchsummary

In [5]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
from torch import nn
from torchsummary import summary
import torchvision

import cv2
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from pathlib import Path

import segmentation_models_pytorch as smp
from segmentation_models_pytorch import Linknet
from segmentation_models_pytorch.encoders import get_preprocessing_fn
import segmentation_models_pytorch.utils as smp_utils



In [6]:
ROOT_PATH = Path('/kaggle/input/makeup-lips-segmentation-28k-samples/set-lipstick-original/')
IMG_PATH = ROOT_PATH / '720p'
MSK_PATH = ROOT_PATH / 'mask'
DF_PATH = ROOT_PATH / 'list.csv'
BACKBONE = 'resnet50'

In [7]:
df = pd.read_csv(DF_PATH)
print(f'Shape {df.shape}')
print(f'Images in folder - {len(os.listdir(MSK_PATH))}\nMasks in folder - {len(os.listdir(IMG_PATH))}')

In [8]:
df.head()

In [9]:
train_val_df, test_df = train_test_split(df, random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.2, random_state=42)

train_val_df.head(2)

# Make dataset

In [10]:
class LipsDataset(torch.utils.data.DataLoader):
    IMG = 'filename'
    MSK = 'mask'
    data_type = 'float32' #'uint8'
    
    def __init__(self, mask_path: Path, img_path: Path, 
                 df: pd.DataFrame, preprocessing=None):
        self.mask_path = mask_path
        self.img_path = img_path
        self.df = df
        self.preprocessing = preprocessing
        
        tqdm.pandas()
        print('Check if is sended imgage exist...')
        self.df[self.IMG] = self.df[self.IMG]\
            .progress_apply(lambda f: img_path / f if os.path.exists(img_path / f) else np.NaN)
        
        print('Check if is sended mask exist...')
        self.df[self.MSK] = self.df[self.MSK]\
            .progress_apply(lambda f: mask_path / f if os.path.exists(mask_path / f) else np.NaN)
        self.df.dropna(axis=0, inplace=True)
    
    def __len__(self):
        return len(self.df)
    
    def  __getitem__(self, indx):
        # read data from folders
        # https://tproger.ru/translations/opencv-python-guide/
        image = cv2.imread(str(self.df.iloc[indx][self.IMG]))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (256, 256))
        # type does not matter because of torchvision.transforms.ToTensor() set float64
        image = image.astype(self.data_type)
        
        mask = cv2.imread(str(self.df.iloc[indx][self.MSK]))
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)
        mask = cv2.resize(mask, (256, 256))
        mask = mask / 255
        mask = mask.astype(self.data_type)
        
        # apply preprocessing
#         if self.preprocessing:
#             sample = self.preprocessing(image=image, mask=mask)
#             image, mask = sample['image'], sample['mask']
            
        if self.preprocessing:
            image = self.preprocessing(image)
#             image = torch.as_tensor(image)
        else:
            # normalyze to [0, 1]
#             image = torch.as_tensor(image) / 255.0
            image = image / 255.0
            
        return torchvision.transforms.ToTensor()(image.astype(self.data_type)),\
                torchvision.transforms.ToTensor()(mask.astype(self.data_type))

In [11]:
def img_show(**images) -> None:
    n = len(images)
    plt.figure(figsize=(14, 5))
    
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
#         plt.imshow(image)
        plt.imshow(image.permute(1, 2, 0))
        plt.title(name)
    plt.show()

In [12]:
transforms = smp.encoders.get_preprocessing_fn(BACKBONE, pretrained='imagenet')

In [13]:
train_dataset = LipsDataset(mask_path=MSK_PATH,
                            img_path=IMG_PATH, 
                            df=train_df,
                            preprocessing=transforms)
val_dataset = LipsDataset(mask_path=MSK_PATH,
                            img_path=IMG_PATH, 
                            df=val_df,
                            preprocessing=transforms)

test_dataset = LipsDataset(mask_path=MSK_PATH,
                            img_path=IMG_PATH, 
                            df=test_df,
                            preprocessing=transforms)

In [14]:
# check an image and mask
k = torch.randint(0,train_dataset.__len__(),(1,1)).item()
print(f'Image {k} will be shown below.')
image, mask = train_dataset[k]
img_show(image=image, true_mask=mask)

In [15]:
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=8, 
                                           shuffle=True, 
                                           num_workers=2)
val_loader = torch.utils.data.DataLoader(dataset=val_dataset, 
                                         batch_size=4, 
                                         num_workers=2)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                          batch_size=4, 
                                          num_workers=2)

In [16]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

In [17]:
model = Linknet(encoder_name=BACKBONE, 
               encoder_weights='imagenet', 
               classes=1,
               activation='sigmoid').to(device)

In [18]:
criterion = smp_utils.losses.DiceLoss()
metric = [smp_utils.metrics.IoU(),]

optimizer = torch.optim.Adam(params=model.parameters(), lr=0.005)

In [19]:
# path = '../input/lips-seg-weights/best_weights.pth'
# model.load_state_dict(torch.load(path))

In [20]:
train_epoch = smp_utils.train.TrainEpoch(model=model, 
                                         loss=criterion, 
                                         metrics=metric, 
                                         optimizer=optimizer, 
                                         device=device, 
                                         verbose=True)

valid_epoch = smp_utils.train.ValidEpoch(model=model, 
                                         loss=criterion, 
                                         metrics=metric, 
                                         device=device)

# Train

In [21]:
epochs = 1
max_score = 0
PATH_MODEL = './best_model.pth'
PATH_WEIGHTS = './best_weights.pth'
score_break_value = 0.9

for epoch in range(1,epochs+1):
    print(f'Current epoch - {epoch} from {epochs}')
    train_score = train_epoch.run(train_loader)
    valid_score = valid_epoch.run(val_loader)
    print(f'Next step.\n\n')
    
    if max_score < valid_score['iou_score']:
        max_score = valid_score['iou_score']
        torch.save(model.state_dict(), PATH_WEIGHTS)
        torch.save(model, PATH_MODEL)
        print('Model saved!')
        
    if max_score > score_break_value:
        break

# Test

In [23]:
print(f'Test score - {valid_epoch.run(test_loader)}')

In [25]:
for image, mask in test_loader:
    for i in range(image.shape[0]):
        predict = model(image[i][None].to(device))[0, ...].cpu().detach()
        img_show(image=image[i], true_mask=mask[i], predict=predict)
#         break
    break