In [None]:
import torch.nn as nn
import torch.utils.data
import torchvision as tv
import torch.backends.cudnn

import numpy as np
import random

import matplotlib.pyplot as plt
import seaborn as sns

import os
from tqdm import tqdm

In [None]:
SEED = 0
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
torch.backends.cudnn.benchmark = True

In [None]:
transforms = tv.transforms.Compose([
    tv.transforms.Resize((224, 224)),
    tv.transforms.ToTensor(),
    tv.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset_path = './Data/animals-10_splited/'

train_dir = 'train'
val_dir = 'val'
test_fir = 'test'

In [None]:
# import splitfolders


# splitfolders.ratio(
#     dataset_path,
#     'splited_animals',
#     ratio=(0.65, 0.2, 0.15),
#     seed=SEED,
#     group_prefix=None
# )

In [None]:
train_dataset = tv.datasets.ImageFolder(
    root=dataset_path + train_dir,
    transform=transforms
)

val_dataset = tv.datasets.ImageFolder(
    root=dataset_path + val_dir,
    transform=transforms
)

test_dataset = tv.datasets.ImageFolder(
    root=dataset_path + test_fir,
    transform=transforms
)

In [None]:
len(train_dataset), len(val_dataset), len(test_dataset)

In [None]:
# reduction_ratio = 5
#
# train_dataset = torch.utils.data.Subset(
#     train_dataset,
#     np.random.choice(
#         len(train_dataset),
#         int(len(train_dataset) / reduction_ratio),
#         replace=False
#     ),
# )
#
# val_dataset = torch.utils.data.Subset(
#     val_dataset,
#     np.random.choice(
#         len(val_dataset),
#         int(len(train_dataset) / reduction_ratio),
#         replace=False
#     )
# )

In [None]:
# len(train_dataset), len(val_dataset)

In [None]:
num_classes = dict()
for classes in os.listdir(dataset_path + train_dir):
    num_classes[classes] = len(os.listdir(dataset_path + train_dir + '/' + classes))

for classes in os.listdir(dataset_path + val_dir):
    num_classes[classes] += len(os.listdir(dataset_path + val_dir + '/' + classes))

for classes in os.listdir(dataset_path + test_fir):
    num_classes[classes] += len(os.listdir(dataset_path + test_fir + '/' + classes))

num_classes = dict(sorted(num_classes.items(), key=lambda item: item[1], reverse=True))

In [None]:
def add_value_label(x_list, y_list):
    for i in range(1, len(x_list) + 1):
        plt.text(
            i-1,
            y_list[i-1] / 2,
            y_list[i-1],
            ha='center',
            fontweight='bold',
            fontsize=9,
            color='white'
        )

In [None]:
plt.figure(figsize=(8, 3))

plt.bar(
    num_classes.keys(),
    num_classes.values(),
    color=sns.color_palette('magma', 10),
    width=0.6
)

plt.title('Class distribution')

add_value_label(list(num_classes.keys()), list(num_classes.values()))

In [None]:
plt.figure(figsize=(6, 6))

plt.pie(
    num_classes.values(),
    startangle=90,
    labels=num_classes.keys(),
    autopct='%1.1f%%',
    colors=sns.color_palette('bright', 10)
)

plt.legend(bbox_to_anchor=(0, 1), shadow=True);

In [None]:
batch_size = 32

train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
)

val_loader = torch.utils.data.DataLoader(
    dataset=val_dataset,
    batch_size=batch_size,
    shuffle=False,
)

test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
)

In [None]:
len(train_loader), len(val_loader), len(test_loader)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
def train_model(model, loss, optimizer, scheduler, num_epochs):
    for epoch in range(num_epochs):
        for phase in ['train', 'val']:
            if phase == 'train':
                dataloader = train_loader
                model.train()
            else:
                dataloader = val_loader
                model.eval()

            running_loss = 0.
            running_acc = 0.

            for inputs, labels in tqdm(dataloader):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    preds = model(inputs)
                    loss_value = loss(preds, labels)

                    if phase == 'train':
                        loss_value.backward()
                        optimizer.step()
                    else:
                        preds_class = preds.argmax(dim=1)

                        running_loss += loss_value.item()

                        are_equal = (preds_class.data == labels.data)
                        are_equal = are_equal.float().mean()
                        running_acc += are_equal

        epoch_loss = running_loss / len(dataloader)
        epoch_acc = running_acc / len(dataloader)

        print(f'Epoch {epoch+1}/{num_epochs}; '
              f'Loss: {epoch_loss:.4f}; '
              f'Accuracy: {epoch_acc:.4f};', flush=True)

        scheduler.step()

In [None]:
def evaluate_model(model, loss):
    model.eval()

    running_loss = 0.
    running_acc = 0.

    for inputs, labels in tqdm(test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.set_grad_enabled(False):
            preds = model(inputs)
            loss_value = loss(preds, labels)

        preds_class = preds.argmax(dim=1)

        running_loss += loss_value.item()

        are_equal = (preds_class.data == labels.data)
        are_equal = are_equal.float().mean()
        running_acc += are_equal

    epoch_loss = running_loss / len(test_loader)
    epoch_acc = running_acc / len(test_loader)

    print(f'Loss: {epoch_loss:.4f}; '
          f'Accuracy: {epoch_acc:.4f};', flush=True)

In [None]:
class VGG19Animals10(nn.Module):
    def __init__(self, teach_from_begin=False, linear_neurons=100):
        super(VGG19Animals10, self).__init__()

        self.model = tv.models.vgg19(weights='IMAGENET1K_V1')

        if teach_from_begin is False:
            for param in self.model.parameters():
                param.requires_grad = False

        classifier = nn.Sequential(
            nn.Linear(25088, linear_neurons),
            nn.LeakyReLU(),
            nn.Linear(linear_neurons, 10)
        )
        self.model.classifier = classifier

    def forward(self, x):
        return self.model(x)

In [None]:
model = VGG19Animals10(teach_from_begin=False, linear_neurons=100)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1.0e-3)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, verbose=True)

model = model.to(device)
loss = loss.to(device)

In [None]:
count_parameters(model)

In [None]:
# train_model(model, loss, optimizer, scheduler, num_epochs=3)

In [None]:
path_model = './Pretrained/VGG19_Animals-10.pth'

In [None]:
# torch.save(model.state_dict(), path_model)

In [None]:
model.load_state_dict(torch.load(path_model))

In [None]:
# evaluate_model(model, loss)