In [2]:
import os
os.chdir("/Users/karinazajnullina/Jupiter/InternshipTestTask/cigarette_butt_segmentation/")

import cv2
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

from glob import glob

from lib import *

%matplotlib inline

In [None]:
import pandas as pd

pred_masks_dict = {0: "123", 1 : "456" }
df = pd.DataFrame.from_dict(pred_masks_dict, orient="index", columns=["rle_mask"])
df["img_id"] = df.index
df = df[['img_id', 'rle_mask']]
df.to_csv("pred_val_template.csv")
df = pd.read_csv("pred_val_template.csv", index_col=0)
df.head()

In [None]:
from torchvision import models

In [None]:
import torch
import torch.nn as nn


def convrelu(in_channels, out_channels, kernel, padding):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel, padding=padding),
        nn.ReLU(inplace=True),
    )


class UNet(nn.Module):
    def __init__(self, n_class):
        super().__init__()

        self.base_model = models.resnet18(pretrained=True)
        self.base_layers = list(self.base_model.children())

        self.layer0 = nn.Sequential(*self.base_layers[:3]) # size=(N, 64, x.H/2, x.W/2)
        self.layer0_1x1 = convrelu(64, 64, 1, 0)
        self.layer1 = nn.Sequential(*self.base_layers[3:5]) # size=(N, 64, x.H/4, x.W/4)
        self.layer1_1x1 = convrelu(64, 64, 1, 0)
        self.layer2 = self.base_layers[5]  # size=(N, 128, x.H/8, x.W/8)
        self.layer2_1x1 = convrelu(128, 128, 1, 0)
        self.layer3 = self.base_layers[6]  # size=(N, 256, x.H/16, x.W/16)
        self.layer3_1x1 = convrelu(256, 256, 1, 0)
        self.layer4 = self.base_layers[7]  # size=(N, 512, x.H/32, x.W/32)
        self.layer4_1x1 = convrelu(512, 512, 1, 0)

        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.conv_up3 = convrelu(256 + 512, 512, 3, 1)
        self.conv_up2 = convrelu(128 + 512, 256, 3, 1)
        self.conv_up1 = convrelu(64 + 256, 256, 3, 1)
        self.conv_up0 = convrelu(64 + 256, 128, 3, 1)

        self.conv_original_size0 = convrelu(3, 64, 3, 1)
        self.conv_original_size1 = convrelu(64, 64, 3, 1)
        self.conv_original_size2 = convrelu(64 + 128, 64, 3, 1)

        self.conv_last = nn.Conv2d(64, n_class, 1)

    def forward(self, input):
        x_original = self.conv_original_size0(input)
        x_original = self.conv_original_size1(x_original)

        layer0 = self.layer0(input)
        layer1 = self.layer1(layer0)
        layer2 = self.layer2(layer1)
        layer3 = self.layer3(layer2)
        layer4 = self.layer4(layer3)

        layer4 = self.layer4_1x1(layer4)
        x = self.upsample(layer4)
        layer3 = self.layer3_1x1(layer3)
        x = torch.cat([x, layer3], dim=1)
        x = self.conv_up3(x)

        x = self.upsample(x)
        layer2 = self.layer2_1x1(layer2)
        x = torch.cat([x, layer2], dim=1)
        x = self.conv_up2(x)

        x = self.upsample(x)
        layer1 = self.layer1_1x1(layer1)
        x = torch.cat([x, layer1], dim=1)
        x = self.conv_up1(x)

        x = self.upsample(x)
        layer0 = self.layer0_1x1(layer0)
        x = torch.cat([x, layer0], dim=1)
        x = self.conv_up0(x)

        x = self.upsample(x)
        x = torch.cat([x, x_original], dim=1)
        x = self.conv_original_size2(x)

        out = self.conv_last(x)

        return out

# Задача

**Требуется:** предложить модель, сегментирующую человека на фотографии.  
  
**Вход:** фотография 512x512x3.  
**Выход:** маска человека 320x240.  
**Метрика:** [Dice coefficient](https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient).  
  
  
Данные представляют из себя набор фотографий человека и маски, определяющей положение человека на фотографии.  
Доступные данные разделены на несколько папок:  
- `train` содержит фотографии 320x240x3;
- `train_mask` содержит маски для фотографий из `train` 320x240;
- `valid` содержит фотографии 320x240x3;
- `valid_mask` содержит маски для фотографий из `valid` 320x240;
- `test` содержит фотографии 320x240x3.  
  
Для лучшей модели требуется создать 2 файла, которые необходимы для валидации Вашего решения:  
- сохраненные значения метрик на `val` в формате `pred_val_template.csv`;
- html страницу с предсказанием модели для всех картинок из `real_test` и папку с используемыми картинками в этой html странице.  
  
Также необходимо:
- подготовить код для проверки (докстринги, PEP8);
- создать отчет (можно прямо в ноутбуке) с описанием Вашего исследования, гипотез, анализ данных и т.п.

Примеры приведены ниже.

---

# Данные, метрики и доступные функции

Посмотрим на данные:

In [None]:
from torchsummary import summary
summary(UNet(1), input_size=(3, 512, 512))

In [None]:
path = "data/train"

In [None]:
def dice_loss_own(logits, targets):

        smooth = 1
        num = targets.size(0)
        probs = F.sigmoid(logits) # change to torch sigmoid
        m1 = probs.view(num, -1)
        m2 = targets.view(num, -1)
        intersection = (m1 * m2)

        score = 2. * (intersection.sum(1) + smooth) / (m1.sum(1) + m2.sum(1) + smooth)
        score = 1 - score.sum() / num
        return score

In [None]:
from collections import defaultdict
import torch.nn.functional as F

def print_metrics(metrics, epoch_samples, phase):
    outputs = []
    outputs.append("{}: {:4f}".format("d", metrics / epoch_samples))

    print("{}: {}".format(phase, ", ".join(outputs)))

def train_model(model, optimizer, scheduler, num_epochs=25):
#     best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = 1e10

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        since = time.time()

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                for param_group in optimizer.param_groups:
                    print("LR", param_group['lr'])

                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            epoch_samples = 0

            for inputs, labels in dataloaders[phase]:

                inputs = inputs.float()
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = dice_loss_own(outputs.float(), labels.float())

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                epoch_samples += inputs.size(0)

            print_metrics(loss, epoch_samples, phase)
            epoch_loss = metrics['loss'] / epoch_samples

#             # deep copy the model
            if phase == 'val' and epoch_loss < best_loss:
                print("saving best model")
                best_loss = epoch_loss
#                 best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('{:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

    print('Best val loss: {:4f}'.format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
from collections import defaultdict
import torch.nn.functional as F

def calc_loss(pred, target, metrics, bce_weight=0.5):
#     bce = F.binary_cross_entropy_with_logits(pred, target)

    pred = F.sigmoid(pred)
    dice = dice_loss_own(pred, target)

    loss = dice 

    metrics['dice'] += dice.data.cpu().numpy() * target.size(0)
    metrics['loss'] += loss.data.cpu().numpy() * target.size(0)

    return loss

def print_metrics(metrics, epoch_samples, phase):
    outputs = []
    for k in metrics.keys():
        outputs.append("{}: {:4f}".format(k, metrics[k] / epoch_samples))

    print("{}: {}".format(phase, ", ".join(outputs)))

def train_model2(model, optimizer, scheduler, num_epochs=25):
    if torch.cuda.is_available():
        model.cuda()
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = 1e10

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        since = time.time()

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                for param_group in optimizer.param_groups:
                    print("LR", param_group['lr'])

                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            metrics = defaultdict(float)
            epoch_samples = 0

            for inputs, labels in dataloaders[phase]:
                
                inputs = inputs.to(device)
                labels = labels.to(device)

                inputs = inputs.float()
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = calc_loss(outputs.float(), labels.float(), metrics)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                epoch_samples += inputs.size(0)

            print_metrics(metrics, epoch_samples, phase)
            epoch_loss = metrics['loss'] / epoch_samples

            # deep copy the model
            if phase == 'val' and epoch_loss < best_loss:
                print("saving best model")
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('{:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

    print('Best val loss: {:4f}'.format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
def train_my(model, optimizer, trainloader, num_epochs=25):
    for epoch in range(1):  # loop over the dataset multiple times
        print(epoch)
        running_loss = 0.0
        
        for i, data in enumerate(trainloader, 0):
            
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = 1 - get_dice(outputs.numpy(), labels.numpy())
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 2000))
                running_loss = 0.0

    print('Finished Training')



In [None]:
from os import listdir
import numpy as np
import torch
from torch.utils.data import Dataset
from PIL import Image
import json


class BasicDataset(Dataset):

    def __init__(self, imgs_path, img_format, scale=1):

        self.path = imgs_path
        self.images = listdir(f"{self.path}/images")
        self.annotations = json.load(open(f"{self.path}/coco_annotations.json", "r"))
        self.img_format = img_format

    def __len__(self):
        return len(self.images)

    def __getitem__(self, i):

        img_id = int(self.images[i].split(".")[0])

        opened_img = Image.open(f"{self.path}/images/{img_id:08}.{self.img_format}")
        print(opened_img.mode)
        if opened_img.mode == 'RGBA':
            opened_img = opened_img.convert('RGB')
        img = np.asarray(opened_img).transpose(-1, 0, 1)
        mask = get_mask(img_id, self.annotations)

        return torch.from_numpy(img), torch.from_numpy(mask)

In [None]:
from torch.utils.data import DataLoader

batch_size = 1

dataloaders = {
    'train': DataLoader(BasicDataset("data/train", "jpg"), batch_size=batch_size, shuffle=True, num_workers=0),
    'val': DataLoader(BasicDataset("data/val", "png"), batch_size=batch_size, shuffle=True, num_workers=0)
}

In [None]:
num_class = 1
model = UNet(num_class)

In [None]:
import torch
import torch.optim as optim
from torch.optim import lr_scheduler
import time



model = UNet(1)
device = torch.device('cpu')

optimizer_ft = optim.Adam(model.parameters(), lr=1e-4)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=30, gamma=0.1)


model = train_model(model, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
import torch
import torch.optim as optim
from torch.optim import lr_scheduler
import time
import copy


model = UNet(1)
device = torch.device('cpu')

optimizer_ft = optim.Adam(model.parameters(), lr=1e-4)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=30, gamma=0.1)



model = train_model2(model, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
def calc_loss(pred, target, metrics, bce_weight=0.5):
#     bce = F.binary_cross_entropy_with_logits(pred, target)

    # pred = F.sigmoid(pred)
    dice = dice_loss_own(pred, target)

    loss = dice 

    metrics['dice'] += dice.data.cpu().numpy() * target.size(0)
    metrics['loss'] += loss.data.cpu().numpy() * target.size(0)

    return loss

In [None]:
def dice_loss_own(pred, target, smooth = 1.):
    pred = pred.contiguous()
    target = target.contiguous()    
    print(pred, target)

    intersection = (pred * target).sum(dim=2).sum(dim=2)
    
    loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
    
    return loss.mean()

In [None]:
BasicDataset("data/val", "png").__getitem__(5)

In [None]:
model = UNet(1)

metrics = defaultdict(float)
for inputs, labels in dataloaders['val']:
    outputs = model(inputs.float())
    outputs = torch.sigmoid(outputs)
    print(outputs.shape)
    print(calc_loss(labels.float(), labels.float(), metrics))
    print(outputs.shape)
    print(labels.shape)
#     print(calc_loss(outputs.float(), labels.float(), metrics))
    

In [None]:
def dice_loss_own(probs, targets, smooth=1.): # change for batches or their loss
        
        m1 = probs.view(-1)
        m2 = targets.view(-1)
        intersection = (m1 * m2)

        score = 2. * (intersection.sum() + smooth) / (m1.sum() + m2.sum() + smooth)
        return -score


In [None]:
probs=torch.tensor([[[[1, 0], [0, 1]]]])
targets=torch.tensor([[[[1, 0], [0, 1]]]])

In [None]:
print(dice_loss_own(probs, targets))

---

# Результаты

Пример файла для изображений из `data/valid`:  
_Каждую предсказанную маску для изображения из `valid` необходимо закодировать и записать в показанный ниже файл, который служит примером, именно в таком виде нужно будет представить результат Вашего лучшего решения на данных из `valid`._

In [None]:
pred = pd.read_csv("data/pred_val_template.csv")
pred.head()

Для данных из `test` требуется создать html страницу + картинки для нее.  
Это можно сделать с помощью функции `get_html`, как показано ниже.

In [3]:
val_annotations = json.load(open(f"data/val/coco_annotations.json", "r"))
paths_to_imgs = sorted(glob("data/val/images/*"))
img_ids = [int(path.split("/")[-1].split(".")[0]) for path in paths_to_imgs]
pred_masks = [get_mask(img_id, val_annotations) for img_id in sorted(img_ids)]

_ = get_html(paths_to_imgs, pred_masks, path_to_save="results/example")

В папке `results` создался файл `example.html` и папка `examples` с используемыми картинками.