# Project description

В данном задании вам предстоит осуществить путешевствие в мир Спрингфилда,
где вы сможете познакомиться со всеми любимыми персонажами Симпсонов.

Основным заданием будет обучить классификатор на основе сверточных сетей,
чтобы научиться отличать всех жителей Спрингфилда.
# Dataset description
Обучающая и тестовая выборка состоят из отрывков из мультсериала Симпсоны.
Каждая картинка представлена в формате jpg c необходимой меткой - названием
персонажа изображенного на ней. Тест был поделен на приватную и публичную
часть в соотношении 95/5

В тренировочном датасете примерно по 1000 картинок на каждый класс,
но они отличаются размером.

Метки классов представлены в виде названий папок, в которых лежат картинки.

# Table of content:
1. [__Data preparation__](#data_preparation)
2. [__Create models__](#Create_models)

# <a name='data_preparation'>1. Data preparation</a>

In [None]:
from copy import deepcopy
from tqdm import tqdm
from PIL import Image
import numpy as np
from pathlib import Path
import os

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.datasets import DatasetFolder, ImageFolder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
%matplotlib inline

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# look at the list of classes
os.listdir('Data/train')

In [None]:
dataset = ImageFolder('Data/train/')


In [None]:
# look at the image
np.random.seed(42)

fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(8, 8),
                       sharey=True, sharex=True)

for fig_x in ax.flatten():
    random_characters = np.random.choice(len(dataset), 1)[0]
    im, label = dataset[random_characters]
    img_label = " ".join(map(lambda x: x.capitalize(),
                             dataset.classes[label].split('_')))
    im = im.resize((224, 244))
    fig_x.imshow(im)
    if img_label is not None:
        fig_x.set_title(img_label)
    fig_x.grid(False)

In [None]:
# Create custom DataLoader
class MyDataLoader:
    def __init__(self, data, indices: list, batch_size: int, transformer=None, shuffle=False):
        assert type(shuffle) is bool, \
            f'shuffle should be bool type, not {type(shuffle)}'
        assert type(batch_size) is int, \
            f'batch_size should be type int, not {type(batch_size)}'

        self.shuffle = shuffle
        self.batch_size = batch_size
        self.indices = indices
        self.data = data
        self.data_len = len(indices)
        self.len_ = int(np.ceil(self.data_len / batch_size))

        self.transformer = transformer
        if transformer is None:
            self.transformer = transforms.ToTensor()

    def __len__(self):
        return self.len_

    def __getitem__(self, index):
        start_index = index * self.batch_size
        end_index = min(self.data_len, start_index + self.batch_size)
        batch_indices = self.indices[start_index: end_index]
        X_batch = []
        y_batch = []
        for batch_index in batch_indices:
            X, y = self.data[batch_index]
            X = self.transformer(X)
            X_batch.append(X)
            y_batch.append(y)
        if len(X_batch) > 1:
            X_batch = torch.stack(X_batch)
        else:
            X_batch = torch.unsqueeze(X_batch[0], 0)
        return X_batch, torch.Tensor(y_batch)

    def __next__(self):
        if self.shuffle:
            np.random.shuffle(self.indices)
        for n_batch in range(self.len_):
            return self.__getitem__(n_batch)

In [None]:
# split data
train_val_indices, test_indices = train_test_split(np.arange(len(dataset)),
                                                   train_size=0.75)

train_indices, val_indices = train_test_split(train_val_indices,
                                              train_size=0.75)


# <a name='Create_models'>Create models</a>

In [None]:
def train(model, optimizer, train_data, val_data, loss_func,
          metric=None, epoch_count=10, scheduler=None):
    # Train_history
    history_info = {'Tloss': [], 'Tmetric': [],
                    'Vloss': [], 'Vmetric': []}
    # best Val_score and model params
    best_score = 0.
    best_model_param = {}

    datasets = {}
    if train_data is not None:
        datasets.update({'T': train_loader})
    if val_data is not None:
        datasets.update({'V': val_loader})

    with tqdm(total=epoch_count) as progress_bar:
        for epoch in range(epoch_count):
            description  = f'Epoch: {epoch+1:3}| '
            for mode, data in datasets.items():

                model.train(mode=='T')
                epoch_loss, epoch_metric =\
                    do_epoch(model, optimizer, loss_func, data,
                             mode, metric)
                history_info[mode+'loss'].append(epoch_loss)
                history_info[mode+'metric'].append(epoch_metric)

                # added loss value in progress_bar
                description += mode + f'loss {epoch_loss:7.4}| '

                # if we have metric than added value in progress_bar
                if metric is not None:
                    description += mode + f'metric {epoch_metric:7.4}| '

                    # save best metric value and model parameters
                    if  best_score < epoch_metric and mode=='V':
                        best_score = epoch_metric
                        best_model_param = deepcopy(model.state_dict())

                # scheduler step
                if scheduler is not None and mode=='V':
                    scheduler.step(epoch_metric)

            progress_bar.set_description(description)
            progress_bar.update()
        return history_info, best_model_param

In [None]:
def do_epoch(model, optimizer, loss_func, data_loader, mode='T', metric=None):

    assert mode=='T' or mode=='V', 'mode can be "T" - Train or "V" - Validate'
    # History
    epoch_loss = 0.
    epoch_metric = 0.

    for X, y in data_loader:
        X_tens, y_tens = torch.as_tensor(X, dtype=torch.float, device=DEVICE), \
                torch.as_tensor(y, dtype=torch.long, device=DEVICE)
        predict = model(X_tens).squeeze(dim=-1)
        loss = loss_func(predict, y_tens)
        epoch_loss += loss.item()
        # backward
        if  mode=='T':
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        #  metric calculate
        if metric is not None:
            if DEVICE == 'cpu':
                probability = predict[:, 1].detach().numpy()
            else:
                probability = predict[:, 1].detach().cpu().numpy()
            try:
                epoch_metric += metric(y.numpy(), probability)
            except ValueError as e:
                print(e)
                print(y)

    return epoch_loss / len(data_loader), epoch_metric / len(data_loader)

In [None]:
# light model

class Model(nn.Module):
    def __init__(self, in_channel, out_channel, kernel_size, dropout, n_classes):
        super().__init__()
        self.snn = nn.Conv2d(in_channel, out_channel, kernel_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout)
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(222*222*out_channel, n_classes)

    def forward(self, inputs):
        result = self.dropout(self.relu(self.snn(inputs)))
        
        return self.linear(self.flatten(result))

In [None]:
IM_SIZE = (224, 224)

train_transformer = transforms.Compose([transforms.Resize(IM_SIZE),
                                        transforms.ToTensor(),
                                        transforms.Normalize([0.485, 0.456, 0.406],
                                                             [0.229, 0.224, 0.225])
                                        ])

In [None]:
in_channel = 3
out_channel = 5
kernel_size = 3
dropout = 0.3
output_dim = len(dataset.classes)

model = Model(in_channel, out_channel, kernel_size, dropout, output_dim).to(DEVICE)

In [None]:
optimizer = torch.optim.Adam(model.parameters())

loss_func = nn.NLLLoss().to(DEVICE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, factor=0.5, patience=20,
    threshold=0.001, cooldown=20, verbose=True)

In [None]:
# Create data loaders
train_loader = MyDataLoader(dataset, train_indices, 64,
                            train_transformer, True)
val_loader = MyDataLoader(dataset, val_indices, 64)
test_loader = MyDataLoader(dataset, test_indices, 64)

In [None]:
%%time
history, best_param = \
        train(model, optimizer, train_loader, None, loss_func,
                 metric=None, epoch_count=2, scheduler=scheduler)