In [21]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(font_scale=1.4, style='whitegrid')

import torch
import torchvision
from torchvision import models, datasets, transforms
from torch import nn
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader


from tqdm import tqdm_notebook, trange

from sklearn.preprocessing import LabelEncoder
from pathlib import Path
import pickle
from PIL import Image

In [2]:
device = torch.device('cuda') if torch.cuda.is_available else torch.device('cpu')
device

device(type='cuda')

In [15]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.figure(figsize=(8, 8))
    plt.axis('off')
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)


def visualize_model(model, num_images=6):
    images_so_far = 0
    fig = plt.figure()

    for i, data in enumerate(dataloaders['val']):
        inp, lab = data
        inp, lab = inp.to(device), lab.to(device)

        outputs = model(inp)
        _, preds = torch.max(outputs.data, 1)

        for j in range(inp.size()[0]):
            images_so_far += 1
            imshow(inp.cpu().data[j], class_name[preds[j]])

            if images_so_far == num_images:
                return

In [4]:
DATA = ['train', 'val', 'test']
RESCALE_SIZE = 224

In [9]:
class SimpsonsDataset(Dataset):
    def __init__(self, files, mode):
        super().__init__()
        self.files = sorted(files)
        self.mode = mode

        if self.mode not in DATA:
            print(f'{self.mode} is not correct')
            raise NameError

        self.len_ = len(self.files)
        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)

            with open('label_encoder.pkl', 'wb') as le_dump_file:
                pickle.dump(self.label_encoder, le_dump_file)

    def __len__(self):
        return self.len_

    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image

    def __getitem__(self, item):
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        x = self.load_sample(self.files[item])
        x = self._prepare_sample(x)
        x = np.array(x/255, dtype='float32')
        x = transform(x)

        if self.mode == 'test':
            return x
        else:
            label = self.labels[item]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y

    def _prepare_sample(self, image):
        image = image.resize((RESCALE_SIZE, RESCALE_SIZE))
        return np.array(image)

In [10]:
TRAIN_DIR = Path('Simpsons/train/simpsons_dataset')
TEST_DIR = Path('Simpsons/testset')

train_val_files = sorted(list(TRAIN_DIR.rglob('*.jpg')))
test_files = sorted(list(TEST_DIR.rglob('*.jpg')))

In [61]:
from sklearn.model_selection import train_test_split

train_val_labels = [path.parent.name for path in train_val_files]
train_files, val_files = train_test_split(train_val_files, test_size=0.25, stratify=train_val_labels)

In [18]:
val_dataset = SimpsonsDataset(val_files, mode='val')
train_dataset = SimpsonsDataset(train_files, mode='train')
test_dataset = SimpsonsDataset(test_files, mode='test')

In [103]:
dataloaders = {
    'train': torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=3),
    'val': torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=True, num_workers=3)
}
dataset_size = {'train': len(train_dataset), 'val': len(val_dataset)}
class_name = np.array(np.unique(train_val_labels))

{'train': <torch.utils.data.dataloader.DataLoader at 0x22511e6bcd0>,
 'val': <torch.utils.data.dataloader.DataLoader at 0x22511e6baf0>}

In [106]:
def fit(model, criterion, optimizer, exp_lr_scheduler, num_epochs=30):
    losses = {'train': [], 'val': []}
    best_model_wts = model.state_dict()
    best_acc = 0.0

    for epoch in tqdm_notebook(num_epochs):
        for phase in ['train', 'val']:
            if phase == 'train':
                exp_lr_scheduler.step()
                model.train(True)
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for xb, yb, in tqdm_notebook(dataloaders[phase]):
                xb, yb = xb.to(device), yb.to(device)

                if phase == 'train':
                    optimizer.zero_grad()

                if phase == 'val':
                    with torch.no_grad():
                        outputs = model(xb)
                else:
                    outputs = model(xb)

                preds = torch.argmax(outputs, -1)
                loss = criterion(outputs, yb)

                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                running_loss += loss.item()
                running_corrects += int(torch.sum(preds == yb.data))

            epoch_loss = running_loss / dataset_size[phase]
            epoch_acc = running_corrects / dataset_size[phase]

            losses[phase].append(epoch_loss)
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()
    model.load_state_dict(best_model_wts)
    return losses, model

In [107]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []

        for inp in test_loader:
            inp = inp.to(device)
            model.eval()
            outputs = model(inp).cpu()
            logits.append(outputs)

    probs = F.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

In [109]:
class ConvBlock(nn.Module):
    def __init__(self, in_chan: int, out_chan: int, kernel_size, **kwargs):
        super().__init__()
        self.conv1 = nn.Conv2d(in_chan, out_chan, kernel_size, **kwargs)
        self.pool = nn.MaxPool2d(kernel_size, stride=2)
        self.bn = nn.BatchNorm2d(out_chan, eps=0.0001)

    def forward(self, x):
        x = self.bn(self.pool(F.relu(self.conv1(x))))
        return x

In [111]:
class Conv1Layer(nn.Module):
    """The first&second layer of the schema Layers_for_Simpsons_net.drawio"""
    def __init__(self, in_chan, conv_layer):
        super().__init__()
        if conv_layer is None:
            conv_layer = ConvBlock

        self.branch3x3_1 = conv_layer(in_chan, 8, kernel_size=3)
        self.branch3x3_2 = conv_layer(8, 16, kernel_size=3)

        self.branch4x4_1 = conv_layer(in_chan, 8, kernel_size=4)
        self.branch4x4_2 = conv_layer(8, 16, kernel_size=3)

        self.branch5x5_1 = conv_layer(in_chan, 8, kernel_size=5)
        self.branch5x5_2 = conv_layer(8, 16, kernel_size=5)

        self.branch7x7_1 = conv_layer(in_chan, 8, kernel_size=7)
        self.branch7x7_2 = conv_layer(8, 16, kernel_size=5)

    def _forward(self, x):
        branch3x3 = self.branch3x3_2(self.branch3x3_1(x))
        branch4x4 = self.branch4x4_2(self.branch4x4_1(x))
        branch5x5 = self.branch5x5_2(self.branch5x5_1(x))
        branch7x7 = self.branch7x7_2(self.branch7x7_1(x))

        outputs = [branch3x3, branch4x4, branch5x5, branch7x7]
        return outputs

    def forward(self, x):
        outputs = self._forward(x)
        return torch.cat(outputs, 1)