## setup de l'environement

In [None]:
import splitfolders  # or import split_folders

splitfolders.ratio("../../input/celeba-dataset/img_align_celeba", output="output", seed=1337, ratio=(.6, .2, .2), group_prefix=None) # default values


In [None]:
!git clone https://github.com/aqeelanwar/MaskTheFace.git

In [None]:
!pip install dotmap imutils split-folders

In [None]:
!sed -i 's/ utils./ MaskTheFace.utils./' /kaggle/working/MaskTheFace/utils/aux_functions.py

In [None]:
import os
os.chdir('./MaskTheFace')

In [None]:
import torch
import torch.nn as nn
import torch.nn as nn
import torch.nn.functional as F

from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import torch.nn.functional as F
from torchvision.transforms import ToTensor, Lambda, Compose
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
from tqdm.auto import tqdm as tq
from PIL import Image

## Model - Unet

In [None]:
class double_conv(nn.Module):
    """(conv => BN => ReLU) * 2"""

    def __init__(self, in_ch, out_ch):
        super(double_conv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        x = self.conv(x)
        return x


class inconv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(inconv, self).__init__()
        self.conv = double_conv(in_ch, out_ch)

    def forward(self, x):
        x = self.conv(x)
        return x


class down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(down, self).__init__()
        self.mpconv = nn.Sequential(nn.MaxPool2d(2), double_conv(in_ch, out_ch))

    def forward(self, x):
        x = self.mpconv(x)
        return x


class up(nn.Module):
    def __init__(self, in_ch, out_ch, bilinear=True):
        super(up, self).__init__()

        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode="nearest", align_corners=True)
        else:
            self.up = nn.ConvTranspose2d(in_ch // 2, in_ch // 2, 2, stride=2)

        self.conv = double_conv(in_ch, out_ch)

    def forward(self, x1, x2):
        x1 = self.up(x1)

        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, (diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2))
        
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class outconv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(outconv, self).__init__()
        self.conv = nn.Conv2d(in_ch, out_ch, 1)

    def forward(self, x):
        x = self.conv(x)
        return x


class UNet(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()
        self.inc = inconv(n_channels, 64)
        self.down1 = down(64, 128)
        self.down2 = down(128, 256)
        self.down3 = down(256, 512)
        self.down4 = down(512, 512)
        self.up1 = up(1024, 256, False)
        self.up2 = up(512, 128, False)
        self.up3 = up(256, 64, False)
        self.up4 = up(128, 64, False)
        self.outc = outconv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        x = self.outc(x)
        return torch.sigmoid(x)

In [None]:
LOAD = False
PATH = '../model.pth'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = UNet(3,1).float()
if LOAD :
    model.load_state_dict(torch.load(PATH,map_location=device))

model.to(device)

## data loader

In [None]:
#@title Default title text
import torch
from skimage.io import imread
from torch.utils import data
import os
from MaskTheFace.utils.aux_functions import mask_image , download_dlib_model
import argparse
import dlib
import torchvision.transforms.functional as TF
import random
from torchvision import datasets, transforms


class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

class Args():
    def __init__(self,mask_types=['surgical'],pattern='',color='#0473e2',color_weight = 0.5):
        self.mask_types = mask_types
        self.mask_type = mask_types[0] #'surgical', 'N95', 'KN95', 'cloth', 'gas'
        self.pattern = pattern
        self.color = color
        self.color_weight = color_weight
    verbose = False
    code = None
    path_to_dlib_model = "dlib_models/shape_predictor_68_face_landmarks.dat"
    if not os.path.exists(path_to_dlib_model):
        download_dlib_model()
    predictor = dlib.shape_predictor(path_to_dlib_model)
    detector = dlib.get_frontal_face_detector()

class dataset(data.Dataset):
    def __init__(self,src_image,args,train='train'):   # initial logic happens like transform
        self.src_image = src_image
        self.image_paths = os.listdir(src_image)
        self.args = args
        self.train = train
        print('number of images:',self.__len__())
        
    def transform(self, image, mask):
        image = image[:, :, ::-1]
        topil = transforms.ToPILImage()
        image , mask = topil(image), topil(mask)
        
        resize = transforms.Resize(size=(128,128))
        image, mask = resize(image), resize(mask)

        if random.random() > 0.5:
            image, mask= TF.hflip(image), TF.hflip(mask)

        # Transform to tensor
        image, mask = TF.to_tensor(image), TF.to_tensor(mask)
        return image, mask
        
        
    def __getitem__(self, index):
        if self.train == 'validation' :
            index += int(len(self.image_paths) * 0.6)
        elif self.train == 'test' :
            index += int(len(self.image_paths) * 0.8)
        src = os.path.join(self.src_image,self.image_paths[index])
        args.mask_type = random.choice(self.args.mask_types)
        masked,masktype,mask,original = mask_image(src,self.args)
        if len(masked) == 0 or len(mask) == 0 :
            return None
        image, mask = self.transform(masked[0], mask[0])
        return image, mask

    def __len__(self):  # return count of sample we have
        if self.train == 'train':
            return int(len(self.image_paths) * 0.6) 
        return int(len(self.image_paths) * 0.2)
               

In [None]:
def my_collate(batch):
    len_batch = len(batch) # original batch length
    batch = list(filter (lambda x:x is not None, batch)) # filter out all the Nones
    if len_batch > len(batch): # if there are samples missing just use existing members, doesn't work if you reject every sample in a batch
        diff = len_batch - len(batch)
        for i in range(diff):
            batch = batch + batch[:diff]
    return torch.utils.data.dataloader.default_collate(batch)

In [None]:
!ls /kaggle/input

In [None]:
args = Args()

train_dataset = dataset('/kaggle/input/celeba-dataset/img_align_celeba/img_align_celeba',args,'train')
val_dataset   = dataset('/kaggle/input/celeba-dataset/img_align_celeba/img_align_celeba',args,'validation')
batch_size = 8
num_workers = 2
trainloader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers,collate_fn = my_collate
)
valloader = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers,collate_fn = my_collate
)

In [None]:
x,y = train_dataset.__getitem__(0)
print(x.shape,y.shape)
x,y = val_dataset.__getitem__(0)
print(x.shape,y.shape)
y.mean()

## Training 

In [None]:
def dice_coef_metric(pred, label):
    intersection = 2.0 * (pred * label).sum()
    union = pred.sum() + label.sum()
    if pred.sum() == 0 and label.sum() == 0:
        return 1.
    return intersection / union
def dice_coef_loss(pred, label):
    smooth = 1.0
    intersection = 2.0 * (pred * label).sum() + smooth
    union = pred.sum() + label.sum() + smooth
    return 1 - (intersection / union)
def bce_dice_loss(pred, label):
    dice_loss = dice_coef_loss(pred, label)
    bce_loss = nn.BCELoss()(pred, label)
    return dice_loss + bce_loss
def compute_iou(model, loader, threshold=0.3):
    valloss = 0
    with torch.no_grad():
        for step, (data, target) in enumerate(loader):
            data = data.to(device)
            target = target.to(device)

            outputs = model(data)
            out_cut = np.copy(outputs.data.cpu().numpy())
            out_cut[np.nonzero(out_cut < threshold)] = 0.0
            out_cut[np.nonzero(out_cut >= threshold)] = 1.0

            loss = dice_coef_metric(out_cut, target.data.cpu().numpy())
            valloss += loss

    return valloss / step

In [None]:
def train_model(train_loader, val_loader, loss_func, optimizer, scheduler, num_epochs):
    
    loss_history = []
    train_history = []
    val_history = []
    
    for epoch in range(num_epochs):
        model.train()
        
        losses = []
        train_iou = []
        
        for i, (image, mask) in enumerate(tqdm(train_loader)):
            image = image.to(device)
            mask = mask.to(device)
            outputs = model(image)
            out_cut = np.copy(outputs.data.cpu().numpy())
            out_cut[np.nonzero(out_cut < 0.5)] = 0.0
            out_cut[np.nonzero(out_cut >= 0.5)] = 1.0            
            
            train_dice = dice_coef_metric(out_cut, mask.data.cpu().numpy())
            loss = loss_func(outputs, mask)
            losses.append(loss.item())
            train_iou.append(train_dice)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
                
        val_mean_iou = compute_iou(model, val_loader)
        scheduler.step(val_mean_iou)
        loss_history.append(np.array(losses).mean())
        train_history.append(np.array(train_iou).mean())
        val_history.append(val_mean_iou)
        
        print('Epoch : {}/{}'.format(epoch+1, num_epochs))
        print('loss: {:.3f} - dice_coef: {:.3f} - val_dice_coef: {:.3f}'.format(np.array(losses).mean(),
                                                                               np.array(train_iou).mean()
                                                                               ,val_mean_iou))
    return loss_history, train_history, val_history
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3)


In [None]:
train_loss_list, valid_loss_list, dice_score_list = train_model(trainloader,valloader,bce_dice_loss,optimizer,scheduler,3)
#torch.save(model.state_dict(), './path')

In [None]:
torch.save(model.state_dict(), '/kaggle/working/model.pth')

## ploting training 

In [None]:
plt.figure(figsize=(10,10))
plt.plot(train_loss_list,  marker='o', label="Training Loss")
plt.plot(valid_loss_list,  marker='o', label="Validation Loss")
plt.ylabel('loss', fontsize=22)
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(h.history['loss']);
plt.plot(h.history['val_loss']);
plt.title("SEG Model focal tversky Loss");
plt.ylabel("focal tversky loss");
plt.xlabel("Epochs");
plt.legend(['train', 'val']);

plt.subplot(1,2,2)
plt.plot(h.history['tversky']);
plt.plot(h.history['val_tversky']);
plt.title("SEG Model tversky score");
plt.ylabel("tversky Accuracy");
plt.xlabel("Epochs");
plt.legend(['train', 'val']);

In [None]:
def fast_display(*img2dlist):
    plt.figure(figsize=(16,8))
    nbimg = len(img2dlist)
    cols = min (9,nbimg)
    rows = (nbimg // cols) +1
    for ii, img2d in enumerate(img2dlist):
        plt.subplot(rows,cols,1+ii)
        plt.imshow(img2d)
    plt.show()
   

## Testing

In [None]:
x,y = train_dataset.__getitem__(0)
for i, (data, target) in enumerate(trainloader):
    if True:
        data = data.cuda()
    output = ((model(data))[0]).cpu().detach().numpy()
    if i == 3:
        break
x = data.cpu().detach().numpy() 
y = target.cpu().detach().numpy() 
#x = np.moveaxis(x, 0, 2)
print(x.shape ,x.dtype)
print(y.shape ,y.dtype)
print(output.shape ,output.dtype)
fast_display(x[0][0] ,y[0][0],output[0] )

In [None]:
x = np.moveaxis(x[0], 0, 2)
print(x.shape ,x.dtype)
print(y.shape ,y.dtype)
print(output.shape ,output.dtype)
fast_display(x ,y[0][0],output[0] )


In [None]:
from PIL import Image
import requests
from io import BytesIO

url = 'https://www.google.com/url?sa=i&url=https%3A%2F%2Fwww.federationdesdiabetiques.org%2Ffederation%2Factualites%2Fcoronavirus-covid-19-les-masques-et-les-gants-pour-qui-et-quand-les-porter&psig=AOvVaw0HF-UZhAecG_nUcRVzB4UB&ust=1638364198971000&source=images&cd=vfe&ved=0CAsQjRxqFwoTCKC7gLWUwPQCFQAAAAAdAAAAABAD'
#img = Image.open(requests.get(url, stream=True).raw)
requests.get(url, stream=True).raw

In [None]:
fast_display(img)

In [None]:

traindataset = dataset('','../data/img_align_celeba',args)

In [None]:
x,y = traindataset.__getitem__(101)
fast_display(x[0],y[0])