In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

### Loading data from dataset to numpy arrays

In [None]:
# Path to the dataset directory
dataset_dir = './datasets/animals/animals/'

# All animal classes
animal_classes = ['butterfly', 'cat', 'chicken', 'cow', 'dog', 'elephant', 'horse', 'sheep', 'spider', 'squirrel']

In [None]:
images = []
labels = []

# Getting the list of all image files and their labels
for class_name in animal_classes:
    class_dir = os.path.join(dataset_dir, class_name)
    # Getting the list of all image files of the current class
    image_files = os.listdir(class_dir)
    
    # Processing all images of the current class
    for image_file in image_files:
        image_path = os.path.join(class_dir, image_file)
        image = cv2.imread(image_path)

        # Changing the image size
        image = cv2.resize(image, (224, 224))
        
        # Convert BGR to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Adding the image and its corresponding label to the output lists
        images.append(image)
        labels.append(class_name)

In [None]:
# Converting the list of images and labels to numpy arrays
le = preprocessing.LabelEncoder()
targets = le.fit_transform(labels)

images = np.array(images)
labels = le.fit_transform(labels)

### Exploring dataset

In [None]:
import matplotlib.pyplot as plt
import random

In [None]:
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

In [None]:
print("Training examples : ", len(X_train))
print("Validation examples : ", len(X_valid))
print("Testing examples : ", len(X_test))
print("Image data shape : ", X_train.shape)
print("Total classes : ", len(set(y_test)))

In [None]:
def draw_images_examples(image_array, grid_x, grid_y, title):
    fig = plt.figure(figsize=(grid_x,grid_y))
    fig.suptitle(title, fontsize=20)

    for i in range(1,grid_y*grid_x+1):
        index = random.randint(0, len(image_array))
        image = image_array[index].squeeze()

        plt.subplot(grid_y,grid_x,i)
        plt.imshow(image)

In [None]:
plt.rcParams.update({'font.size': 14})
fig = plt.figure(figsize=(12,6))
n, bins, patches = plt.hist(y_train, len(set(y_train)))
plt.xlabel('Class number')
plt.ylabel('Amount of samples')
plt.title('Training sample class distribution')

draw_images_examples(X_train, 15, 3, 'Examples of images from training set')

class_number = random.randint(0, len(y_train))
example_class = y_train[class_number]

X_train_one_label = X_train[np.where(y_train==example_class)]
draw_images_examples(X_train_one_label, 15, 3, f'Examples of images of the class - {y_train[class_number]}')

As we can see, train dataset has class disbalance. Dogs and spiders have more data than other classes.
Thus, we can use cut extra to make dataset more balanced.
Thus, we can use data augmentation to make dataset more balanced.
TODO:

### Model training

In my investigation I choose Convolutional Neural Networks as the widest, most efficient and classic models for image classification

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
import time

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def to_device(X, y):
    if (type(y) is tuple):
        y = torch.as_tensor(targets)
    return X.to(device), y.to(device, dtype=torch.int64)

In [None]:
class TrafficSignDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.X, self.y = X, y
        self.count = len(self.y)
        self.transform = transform

    def __getitem__(self, index):
        X = self.X[index]
        if self.transform is not None:
            X = self.transform(X)
        return (X, self.y[index])

    def __len__(self):
        return self.count 

In [None]:
class WrappedDataLoader:
    def __init__(self, loader, func):
        self.loader = loader
        self.func = func

    def __len__(self):
        return len(self.loader)

    def __iter__(self):
        for batch in iter(self.loader):
            yield (self.func(*batch))

In [None]:
class LeNet(nn.Module):
    def __init__(self, gray=False):
        super(LeNet, self).__init__()
        input_channels = 1 if gray else 3
        self.conv1 = nn.Conv2d(input_channels, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(4 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 43)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 4 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:

def loss(model, loss_func, X, y, optimizer=None):
    print('X.model:  ' + str(model(X).shape))
    print('X.shape:  ' + str(X.shape))
    print('y.shape:  ' + str(y.shape))

    loss = loss_func(model(X), y)
    if optimizer is not None:
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

    return loss.item(), len(X)

In [None]:
def validate(model, loss_func, X, y):
    output = model(X)
    loss = loss_func(output, y)
    pred = torch.argmax(output, dim=1)
    correct = pred == y.view(*pred.shape)

    return loss.item(), torch.sum(correct).item(), len(X)

In [None]:
def evaluate(model, loss_func, loader):
    model.eval()

    with torch.no_grad():
        validated_batches = []

        for X, y in loader:
          validated_batches.append(validate(model, loss_func, X, y))

        losses, corrects, nums = zip(*validated_batches)
        test_loss = sum(np.multiply(losses, nums)) / sum(nums)
        test_accuracy = sum(corrects) / sum(nums) * 100

    print(f"Test loss: {test_loss:.5f}\t"
          f"Test accruacy: {test_accuracy:.3f}%")

In [None]:
def training_plots(losses_arr):
    plt.figure(figsize=(8,6))
    plt.plot([x[2] for x in losses_arr])
    plt.ylabel('Accuracy in %')
    plt.xlabel('Epochs')
    plt.xticks([x + 1 for x in range(n_epochs) if x % 2 == 1])
    plt.show()

    plt.figure(figsize=(8,6))
    plt.plot([x[0] for x in losses_arr], label='train loss')
    plt.plot([x[1] for x in losses_arr], label='validation loss')
    plt.legend(loc="upper right")
    plt.ylabel('Losses')
    plt.xlabel('Epochs')
    plt.xticks([x + 1 for x in range(n_epochs) if x % 2 == 1])
    plt.show()

In [None]:
def train(model, criterion, optimizer, n_epochs, transforms, saving_model_path=None):
    
    train_dataset = TrafficSignDataset(X_train, y_train, transform=transforms)
    valid_dataset = TrafficSignDataset(X_valid, y_valid, transform=transforms)
    test_dataset = TrafficSignDataset(X_test, y_test, transform=transforms)

    train_loader = WrappedDataLoader(DataLoader(train_dataset, batch_size=64, shuffle=True), to_device)
    valid_loader = WrappedDataLoader(DataLoader(valid_dataset, batch_size=64, shuffle=False), to_device)
    test_loader = WrappedDataLoader(DataLoader(test_dataset, batch_size=64, shuffle=False), to_device)

    print('\nFitting nn model')
    start_time = time.time()

    losses_arr = fit(n_epochs, model, criterion, optimizer, train_loader, valid_loader)
    print(f'Fit time: {time.time() - start_time} s')

    check_point = torch.load('model.pt', map_location=device)
    model.load_state_dict(check_point)

    evaluate(model, criterion, test_loader)

    if saving_model_path is not None:
        print('Saving model')
        torch.save(model.state_dict(), model_path(saving_model_path))

    training_plots(losses_arr)

In [None]:
batch_size, channels, height, width = X_train.shape
model = LeNet().to(device)
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
model_path = lambda name: f"./datasets/animals/animals/{name}.model"
optimizer = optim.Adam(model.parameters(), lr=0.001)
n_epochs = 20

In [None]:
train(model, criterion, optimizer, n_epochs, transforms.ToTensor(), 'base_model')