In [108]:
# Python libraries for navigation in directories(e.g. iteration there).
from os import listdir
from os.path import join

# Standard libraries for ML, we will use it permanently.
import numpy as np
import pandas as pd
import torch

import torchmetrics.classification.accuracy # It may be bad idea to use this for metrics, but different tests gave me not bad result.
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim

from torchmetrics.classification import MulticlassStatScores
from torchvision.datasets import ImageFolder #M ethod that allow us to use name of directories as labels.
from torchvision.io import read_image
from torch.utils.data import DataLoader
from torchvision.transforms import transforms # I don't like albumentations library because of my classes in university...
from tqdm import tqdm

In [109]:
#Constants that we will use in the next cells.
rootdir = "simpsons_dataset" #This is where dataset located. Change it to the relevant.
rate_learning = 1e-3
epochs = 30  # After 30th epoch we can see the beginning of overfitting at this parameters. I guess there could be a bit more complexity of model than it need.
classnum = 42 # As you will use this dataset for DL, don't forget to delete duplicate of simpson_dataset in simpson_dataset.
bs = 128 # Change this parameter according to hardware.
k_prop = 0.8 # Testset in this dataset sucks.
wd = 1e-3 # Weight decay for weight regularization
classlist = listdir(rootdir)
counter = 20933

dropout_rate = 0.2 #A little bit increase of this probabilty will occur as bad converge
loss_list_train = []
loss_list_test = []

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [124]:
def focused_loss():
    class_weights = []
    for el in classlist:
        class_weights.append(1-(len(listdir(join(rootdir, el))))/counter)
    class_weights = torch.Tensor(class_weights).to(device)
    return class_weights

In [117]:
def create_transformer():
    transformer = transforms.Compose([
                                  transforms.ToTensor(),
                                  transforms.ConvertImageDtype(dtype=torch.float32),
                                  transforms.Resize([100, 100]),
                                  transforms.RandomHorizontalFlip(p=0.5),
                                  transforms.RandomRotation(15),
                                  transforms.Normalize(mean=[0.5], std=[0.25])]
                                )
    return transformer

In [110]:
#Model-class
class NeuralNetwork(nn.Module):
    """
    We use standard CNN for this classification, without any tricks from ResNet, MobileNet or Inception. Maybe (?) these model will be
    rewritted with only one convolution block in different parameters. Bias in this model increase converge. But it may be a little bit 
    overfitting. 
    """
    __channels_list__ = [3, 16, 32, 64, 128, 256, 512]  # List of in/out channels for convolutions. If you have a lot of memory on GPU you can expand it.
    
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        
        def __block__(in_channels, out_channels):
            """
            This architecture of Neural Networks is obvious, I guess. BTW, without BN model converge in slowly in 3-4 times.
            """
            return nn.Sequential(
             nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 3), padding=1, bias=True),
             nn.ReLU(),
             nn.MaxPool2d(2, 2),
             nn.BatchNorm2d(out_channels)
             )

        self.__conv_list__ = [__block__(self.__channels_list__[i], self.__channels_list__[i+1]) for i in range(len(self.__channels_list__)-1)]

        self.conv_stack = nn.Sequential()
        
        for conv in self.__conv_list__:
            self.conv_stack.extend(conv)
    
        self.linear_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 120),
            nn.Linear(120, 84),
            nn.Dropout(dropout_rate),
            nn.Linear(84, classnum),
        )
        
    def forward(self, x):
        for conv in self.conv_stack:
            x = conv(x)
        x = self.linear_stack(x)
        return x


In [126]:
#Summary count of images in dataset is 20933.
#Splitting dataset in standard proportion - we take 16k images for train and 4k images for test. It is enough for our purpose.
def load_and_split():
    data = ImageFolder(rootdir, transform=create_transformer())
    (train, test) = torch.utils.data.random_split(data, [0.8, 0.2]) # Split the data with the next proportion - 80% of dataset are train, and remaining 20% are test.
    return train, test 

#DataLoader from torch with shuffle. We recieve batch of images with size of variable bs. These will increase rate of model's converge.
def dataload(train, test):
    trainloader = DataLoader(train, batch_size=bs, shuffle=True)  # If shuffle == False, then pictures will go through the pipeline in order. It's bad, when your
    # dataset is sorted.
    testloader = DataLoader(test, batch_size=bs, shuffle=True)
    return trainloader, testloader

In [None]:
def model_create(weights):
    stats = MulticlassStatScores(num_classes=classnum, average=None).to(device)  # Create metrics for every class. 
    # This method creates object that accumulate TP, TN, FN, FP for every class.
    model = NeuralNetwork()  # Create our CNN model.
    model.to(device)  # Transfer it to device.
    loss = nn.CrossEntropyLoss(weight = weights)  #  Why weights? It's solution of some problem. What's the matter - in this dataset we can 
    # see clearly disbalanced class. So, in this case often we can see solution with focused loss. About it you can read below.
    optimizer = optim.AdamW(model.parameters(), lr=rate_learning, weight_decay=wd)
    return model, optimizer, loss, stats

In [None]:
def calculate_loss(target, labels, loss_func, mode="train"):
    """
    Calculate a loss with given loss_func.
    """
    loss = loss_func(target, labels)
    loss_temp = loss
    if mode == "train":
        loss_list_train.append(loss_temp.item())
    else:
        loss_list_test.append(loss_temp.item())
    return loss

In [None]:
# Standard train for models.
def run_model(model, optim, trainloader, testloader, loss_func):
    for data, labels in tqdm(trainloader):
        model.train()  # Evaluate our model in test mode. In default - model is in this mode.
        data, labels = data.to(device), labels.to(device)  # Copy data to the GPU.
        optim.zero_grad()  # Zero gradients - if  we don't zero him, then our batch size equals num_of_repats_with_no_zero_grad * batch_size.
        # This is one of the tricks in learning to make batch bigger.
        target = model(data)  # Forward the data through network.
        loss = calculate_loss(target, labels, loss_func)
        
        loss.backward()
        optim.step()
    
    for data, labels in tqdm(testloader):
        model.eval()  # Change model's mode to test. What the difference: in this mode Pytorch don't accumulate gradients in calculation graphs, also BN
        # doesn't accumulate expectation and variance. And dropouts are off.
        
        data, labels = data.to(device), labels.to(device)
        
        target = model(data)

        calculate_loss(target, labels, loss_func, mode="test")
        
    return None

In [None]:
# Calculate TP, TN, FN, FP for every class through MultiClassStatScores
def test_class_acc(model, testloader, stats):
    for data, labels in testloader:
        model.eval()
        
        data, labels = data.to(device), labels.to(device)
        
        target = model(data)
        
        stats.update(target, labels) # This method calculates confusion matrix.
    tp, fp, tn, fn = stats._final_state() # And then we extract values from it.
    tp, fp, tn, fn = tp.cpu(), fp.cpu(), tn.cpu(), fn.cpu()
    class_acc = torch.nan_to_num(torch.div(tp+tn, tp+fp+tn+fn))
    
    return class_acc

In [130]:
def main():
    train, test = load_and_split()
    
    (train_loader, test_loader) = dataload(train, test)
    
    class_weights = focused_loss()
    
    model, optimizer, loss, statistic = model_create(class_weights)
    
    if __name__ == "__main__":
        for i in range(epochs):
            run_model(model, optimizer, train_loader, test_loader, loss)
    
    accuracy= test_class_acc(model, test_loader, statistic)


In [131]:
main()

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 131/131 [01:17<00:00,  1.70it/s]
  3%|█████                                                                                                                                                                   | 1/33 [00:00<00:23,  1.39it/s]


KeyboardInterrupt: 