In [None]:
import os
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import data_utils.data_loading as data_load
from data_utils.cifar10_testset import CIFAR10Testset

random_seed = 42
random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)

In [None]:
class ModelTrainer:

    def __init__(self, batch_size: int, device: str, resize=224):
        
        self.batch_size = batch_size
        self.device = device
        self.trainloader, self.valloader = self.load_data(resize)
        if not os.path.exists('../nets'):
            os.makedirs('../nets')
        if not os.path.exists('../loss'):
            os.makedirs('../loss')
        if not os.path.exists('../accuracy'):
            os.makedirs('../accuracy')
  
    def load_data(self, resize):
        
        transform = transforms.Compose([
            transforms.Resize(resize),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        return data_load.load_train_data(transform, self.batch_size)


    def train_one_epoch(self, model, optimizer, loss_criterion, epoch: int):
        
        model.train()
        
        running_loss = 0.0
        last_loss = 0.0

        running_accuracy = 0.0
        last_accuracy = 0.0

        for i, data in enumerate(self.trainloader):
            inputs, labels = data[0].to(self.device), data[1].to(self.device)

            optimizer.zero_grad()

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            loss = loss_criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_accuracy += (labels == predicted).sum().item() / len(labels)

            del inputs, labels, outputs

            if i % 200 == 199:
                last_loss = running_loss / 200
                last_accuracy = running_accuracy / 200
                print(f'[epoch: {epoch + 1}, batches: {i - 198:5d} - {i + 1:5d}] train loss: {last_loss:.3f}, train accuracy: {last_accuracy:.3f}')
                running_loss = 0.0
                running_accuracy = 0.0

        return last_loss, last_accuracy


    def train_model(self, model, optimizer, loss_criterion, number_of_epochs: int, name: str):
        
        model.to(self.device)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        best_modelstate_path = f'../nets/cifar_{name}_{format(timestamp)}'
        best_vloss = 1_000_000.
        loss = np.empty((2, number_of_epochs))
        accuracy = np.empty((2, number_of_epochs))

        for epoch in range(number_of_epochs):

            loss[0, epoch], accuracy[0, epoch] = self.train_one_epoch(model, optimizer, loss_criterion, epoch)

            model.eval()

            with torch.no_grad():
                running_vloss = 0.0
                running_vaccuracy = 0.0
                for i, vdata in enumerate(self.valloader):
                    vinputs, vlabels = vdata[0].to(self.device), vdata[1].to(self.device)
                    voutputs = model(vinputs)
                    _, vpredicted = torch.max(voutputs.data, 1)
                    vloss = loss_criterion(voutputs, vlabels)
                    running_vloss += vloss
                    running_vaccuracy += (vlabels == vpredicted).sum().item() / len(vlabels)
                    del vinputs, vlabels, voutputs

            loss[1, epoch] = avg_vloss = running_vloss / (i + 1)
            accuracy[1, epoch] = avg_vaccuracy = running_vaccuracy / (i + 1)
            print(f'[epoch: {epoch + 1}] validation loss: {avg_vloss:.3f}, validation accuracy: {avg_vaccuracy:.3f}')
            np.savetxt(f'../loss/cifar_{name}_{format(timestamp)}.csv', loss[:, :(epoch + 1)], delimiter=',')
            np.savetxt(f'../accuracy/cifar_{name}_{format(timestamp)}.csv', accuracy[:, :(epoch + 1)], delimiter=',')

            if avg_vloss < best_vloss:
                best_vloss = avg_vloss
                torch.save(model.state_dict(), best_modelstate_path)

        print('Finished Training')
        self.visualize_loss(number_of_epochs, loss)
        self.visualize_accuracy(number_of_epochs, accuracy)
        return best_modelstate_path


    def visualize_loss(self, number_of_epochs, loss):
        
        plt.plot(range(1, number_of_epochs + 1), loss[0,:], marker='o')
        plt.plot(range(1, number_of_epochs + 1), loss[1,:], marker='o')
        plt.legend(['train', 'validation'])
        plt.xlabel('epoch')
        plt.ylabel('loss')
        plt.xticks(range(1, number_of_epochs + 1))
        plt.show()

    
    def visualize_accuracy(self, number_of_epochs, accuracy):
        
        plt.plot(range(1, number_of_epochs + 1), accuracy[0,:], marker='o')
        plt.plot(range(1, number_of_epochs + 1), accuracy[1,:], marker='o')
        plt.legend(['train', 'validation'])
        plt.xlabel('epoch')
        plt.ylabel('accuracy')
        plt.xticks(range(1, number_of_epochs + 1))
        plt.show()

In [None]:
class ModelTester:
    def __init__(self, batch_size: int, device: str, resize=224):  
        self.batch_size = batch_size
        self.device = device
        self.testloader, self.number_of_images = self.load_data(resize)
        if not os.path.exists('../tests'):
            os.makedirs('../tests')

    def load_data(self, resize: int):
        
        transform = transforms.Compose([
            transforms.Resize(resize),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        return data_load.load_test_data(transform, self.batch_size)
    
    def test_model(self, model, best_modelstate_path: str):
        
        classes = np.empty((self.number_of_images), dtype=object)
        model.load_state_dict(torch.load(best_modelstate_path))
        model.to(self.device)
        total = 0

        with torch.no_grad():
            for data in self.testloader:
                images = data[0].to(self.device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                predicted = predicted.tolist()
                for predicted_label in predicted:
                    classes[total] = CIFAR10Testset.label_number_to_class[predicted_label]
                    total += 1

        pd.DataFrame(classes, index=range(1, total + 1), columns=['label']).to_csv(best_modelstate_path.replace('nets', 'tests') + '.csv', index_label='id')

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
number_of_epochs = 15
batch_size = 32
trainer = ModelTrainer(batch_size, device)
model = torchvision.models.resnet18(pretrained=True)
for param in model.parameters():
    param.requires_grad = False
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)
best_state_path = trainer.train_model(model, optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss(), number_of_epochs, 'resnet18_Adam001_freeze')
tester = ModelTester(batch_size, device)
tester.test_model(model, best_state_path)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
number_of_epochs = 15
batch_size = 32
trainer = ModelTrainer(batch_size, device)
model = torchvision.models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)
best_state_path = trainer.train_model(model, optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss(), number_of_epochs, 'resnet18_Adam001')
tester = ModelTester(batch_size, device)
tester.test_model(model, best_state_path)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
number_of_epochs = 15
batch_size = 32
trainer = ModelTrainer(batch_size, device)
model = torchvision.models.efficientnet_b0(pretrained=True)
for param in model.parameters():
    param.requires_grad = False
num_ftrs = model.classifier[1].in_features
model.classifier[1] = nn.Linear(num_ftrs, 10)
best_state_path = trainer.train_model(model, optim.SGD(model.parameters(), lr=0.001, momentum=0.9), nn.CrossEntropyLoss(), number_of_epochs, 'efficientnetb0_SGD0019_freeze')
tester = ModelTester(batch_size, device)
tester.test_model(model, best_state_path)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
number_of_epochs = 15
batch_size = 32
trainer = ModelTrainer(batch_size, device)
model = torchvision.models.efficientnet_b0(pretrained=True)
num_ftrs = model.classifier[1].in_features
model.classifier[1] = nn.Linear(num_ftrs, 10)
best_state_path = trainer.train_model(model, optim.SGD(model.parameters(), lr=0.001, momentum=0.9), nn.CrossEntropyLoss(), number_of_epochs, 'efficientnetb0_SGD0019')
tester = ModelTester(batch_size, device)
tester.test_model(model, best_state_path)