In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import math
import collections
import shutil
import os
from typing import List

def mkdir_if_not_exist(path_list : List[str]):
    """Make a directory if it does not exist."""
    path = os.path.join(*path_list)
    if not os.path.exists(path):
        os.makedirs(path)

def reorg_train_valid(data_dir, train_dir, input_dir, valid_ratio, idx_label):
  min_n_train_per_label = (
      collections.Counter(idx_label.values()).most_common()[:-2:-1][0][1])

  n_valid_per_label = math.floor(min_n_train_per_label * valid_ratio)
  label_count = {}
  for train_file in os.listdir(os.path.join(data_dir, train_dir)):
    idx = train_file.split('.')[0]
    label = idx_label[idx]

    mkdir_if_not_exist([data_dir, input_dir, 'train_valid', label])

    shutil.copy(os.path.join(data_dir, train_dir, train_file),
                os.path.join(data_dir, input_dir, 'train_valid', label))

    if label not in label_count or label_count[label] < n_valid_per_label:
      mkdir_if_not_exist([data_dir, input_dir, 'valid', label])
      shutil.copy(os.path.join(data_dir, train_dir, train_file),
                  os.path.join(data_dir, input_dir, 'valid', label))
      label_count[label] = label_count.get(label, 0) + 1

    else:
      mkdir_if_not_exist([data_dir, input_dir, 'train', label])
      shutil.copy(os.path.join(data_dir, train_dir, train_file),
                  os.path.join(data_dir, input_dir, 'train', label))

def reorg_dog_data(data_dir, label_file, train_dir, test_dir, input_dir, valid_ratio):
  with open(os.path.join(data_dir, label_file), 'r') as f:
    lines = f.readlines()[1:]
    tokens = [l.rstrip().split(',') for l in lines]
    idx_label = dict(((idx, label) for idx, label in tokens))

  reorg_train_valid(data_dir, train_dir, input_dir, valid_ratio, idx_label)

  mkdir_if_not_exist([data_dir, input_dir, 'test', 'unknown'])
  for test_file in os.listdir(os.path.join(data_dir, test_dir)):
    shutil.copy(os.path.join(data_dir, test_dir, test_file),
                os.path.join(data_dir, input_dir, 'test', 'unknown'))

if __name__ == '__main__':

  data_dir, label_file, train_dir, test_dir = '/content/drive/MyDrive/ICT303/dog-breed-identification', 'labels.csv', 'train', 'test'
  input_dir, batch_size, valid_ratio = 'train_valid_test', 128, 0.1
  reorg_dog_data(data_dir, label_file, train_dir, test_dir, input_dir, valid_ratio)

In [None]:
import torch

def accuracy(outputs, labels):
    _, predicted = torch.max(outputs, dim=1)
    total = labels.size(0)
    correct = (predicted == labels).sum().item()
    accuracy = correct / total
    return accuracy

In [None]:
def accuracy(outputs, labels):

    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy

In [None]:
from pathlib import Path

import numpy as np

import torch
from PIL import Image
from torchvision import datasets, transforms, models


def accuracy(outputs, labels):

    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy


class Validator():

    def __init__(self, val_folder: Path, val_interval: int = 2) -> None:
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.val_folder = val_folder
        self.val_interval = val_interval
        self._prepare_data()

    def _prepare_data(self):

        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])


        self.dataset = datasets.ImageFolder(self.val_folder, transform=self.transform)

    def validate_model(self, model: torch.nn.Module, criterion, batch_size: int = 32):


        assert model is not None, "Model is not defined"

        dataloader = torch.utils.data.DataLoader(self.dataset, batch_size=batch_size, shuffle=True)

        model.eval()
        total_accuracy = 0.0
        running_loss = 0.0
        with torch.no_grad():
            for batch_i, (inputs, labels) in enumerate(dataloader):
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)


                outputs = model(inputs)
                loss = criterion(outputs, labels)

                batch_accuracy = accuracy(outputs, labels)
                total_accuracy += batch_accuracy

                running_loss += loss.item()

                if (batch_i + 1) % 10 == 0:
                    print(f"Val Step [{batch_i + 1}/{len(dataloader)}], Loss: {running_loss / 10:.4f} Accuracy: {total_accuracy / (batch_i +1):.4f}")
                    running_loss = 0.0

            val_metric = total_accuracy / (batch_i + 1)
            print(f"Validation accuracy: {val_metric:.4f}")
            print("last output", self._prediction_to_csv_str(torch.nn.functional.softmax(outputs[0]).detach().cpu().numpy()))

            return val_metric

    def test_on_folder(self, model: torch.nn.Module, test_folder: Path, output_file: Path, batch_size: int = 64):


        if output_file.exists():
            print(f"Output file {output_file} already exists. Exiting...")
            return

        print(f"Testing model on {test_folder}")

        csv_output = "id," + ",".join(self._get_class_names()) + "\n"
        batch_img_tensors = []
        batch_img_paths = []
        batch_count = 0
        total_count = 0
        n_test_imgs = len(list(test_folder.iterdir()))
        for img_path in test_folder.iterdir():
            if img_path.is_file():
                img_data = Image.open(img_path)
                img_tensor = self.transform(img_data)
                img_tensor = img_tensor.unsqueeze(0)
                batch_img_tensors.append(img_tensor)
                batch_img_paths.append(img_path)

                if batch_count == batch_size or total_count == n_test_imgs - 1:
                    batch_count = 0
                    img_tensor = torch.cat(batch_img_tensors)

                    img_tensor = img_tensor.to(self.device)
                    outputs = torch.nn.functional.softmax(model(img_tensor), dim=1).cpu().detach().numpy()

                    for output, path in zip(outputs, batch_img_paths):
                        csv_line = f"{path.stem},{self._prediction_to_csv_str(output)}\n"
                        csv_output += csv_line

                    batch_img_tensors = []
                    batch_img_paths = []

                batch_count += 1
                total_count += 1

        with open(output_file, "w") as f:
            f.write(csv_output)

        print(f"Saved predictions to {output_file}")

    def _get_class_names(self):
        return self.dataset.classes

    def _prediction_to_csv_str(self, pred_probs: np.array, sep=",") -> str:

        result_str = ""
        for class_prob in pred_probs:
            if class_prob < 0:
                class_prob *= -1
            result_str += f"{class_prob:.4f}{sep}"
        return result_str[:-1]


In [None]:
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms, models



class Trainer():

    def __init__(self, train_folder : Path, save_folder : Path, n_epochs : int, batch_size : int, augmetation_transforms) -> None:
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.train_folder = train_folder
        self.save_folder = save_folder
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.augmetation_transforms = augmetation_transforms
        self._prepare_data()
        self._prepare_model()

    def _prepare_data(self):

        transform_steps = [
            transforms.Resize(256),
            transforms.CenterCrop(224)
        ]

        if self.augmetation_transforms is not None and len(self.augmetation_transforms) > 0:
            transform_steps.extend(self.augmetation_transforms)

        transform_steps.append(transforms.ToTensor())
        transform_steps.append(transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))

        transform = transforms.Compose(transform_steps)


        self.dataset = datasets.ImageFolder(self.train_folder, transform=transform)

    def _prepare_model(self):

        self.model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        num_classes = len(self.dataset.classes)
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
        self.model = self.model.to(self.device)

    def load_model_from_file(self, file : Path):
        self.model.load_state_dict(torch.load(file))
        self.model = self.model.to(self.device)

    def train(self, validator : Validator):



        dataloader = torch.utils.data.DataLoader(self.dataset, batch_size=self.batch_size, shuffle=True)


        criterion = nn.CrossEntropyLoss()
        # optimizer = optim.AdamW(self.model.parameters(), lr=0.001)
        optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
       # optimizer = optim.Adam(self.model.parameters(), lr=0.001)


        best_val_score = 0.0
        for epoch in range(self.n_epochs):
            running_loss = 0.0
            self.model.train()
            for i, (inputs, labels) in enumerate(dataloader):
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)

                optimizer.zero_grad()


                outputs = self.model(inputs)
                loss = criterion(outputs, labels)


                loss.backward()
                optimizer.step()

                running_loss += loss.item()

                if (i + 1) % 10 == 0:
                    print(f"Epoch [{epoch + 1}/{self.n_epochs}], Step [{i + 1}/{len(dataloader)}], Loss: {running_loss / 10:.4f}")
                    running_loss = 0.0

            if (epoch + 1) % validator.val_interval == 0:
                val_score = validator.validate_model(self.model, criterion, batch_size=self.batch_size)

                if val_score > best_val_score:
                    best_val_score = val_score
                    torch.save(self.model.state_dict(), self.save_folder / "best_model.pt")
                    print("Best model saved!")

        print("Training complete!")

In [None]:

from pathlib import Path

from torchvision import transforms

data_augementation_steps = [
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    #transforms.RandomRotation(degrees=20),
   # transforms.CenterCrop(15),

    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.2, hue=0.1)
]

trainer = Trainer(train_folder=Path("/content/drive/MyDrive/ICT303/dog-breed-identification/train_valid_test/train"),
                 save_folder=Path("/content/drive/MyDrive/ICT303/dog-breed-identification"),
                    n_epochs=10,
                    batch_size=128,
                    augmetation_transforms=data_augementation_steps
                 )
validator = Validator(val_folder=Path("/content/drive/MyDrive/ICT303/dog-breed-identification/train_valid_test/valid"), val_interval=1)



print("start training ...")
trainer.train(validator)
trainer.load_model_from_file(Path("/content/drive/MyDrive/ICT303/dog-breed-identification/best_model.pt"))
validator.test_on_folder(trainer.model, test_folder=Path("/content/drive/MyDrive/ICT303/dog-breed-identification/test"), output_file=Path("test_results.csv"))

This code is the script for training the deep learning model on a dog bread identification dataset.It uses ResNet-50 architecture , which is a popular conventional neural network architecture(CNN), which is known for its good performance on various image classification tasks.

# Break down of the code:

First the functions reorg_train_valid and reorg_dog_data for reorganizing the data into train, validation, and test sets are defined.The function reorg_dog_data takes the data directory, label file, train, and test directories, and a validation ratio as input. It separates the data into train, validation, and test folders based on the provided ratio.

Then The Validator class is defined to handle the validation process. It takes the validation folder (after reorganizing the data), and validation interval as input. It uses PyTorch's ImageFolder dataset to load the images and apply required transformations for validation.

The test_on_folder method in the Validator class is used for testing the model on the test dataset. It loads the test images, performs inference, and saves the predictions to a CSV file.

The Trainer class is defined to handle the training process. It takes the train folder, save folder, number of epochs, batch size, and optional data augmentation transformations as input. It uses the ResNet-50 architecture and adapts the last fully connected layer to match the number of classes in the dataset.
The training loop iterates through the dataset for the specified number of epochs, using cross-entropy loss and stochastic gradient descent (SGD) as the optimizer.

There is  a list of data augmentation transformations defined, such as random horizontal and vertical flips, color jitter, etc. These augmentations help in creating variations of the input data during training, which can lead to better generalization and performance.
I have chosen the ones that give better result than others after trying them out.

The script creates an instance of the Trainer class and an instance of the Validator class. It then performs the training using the trainer.train(validator) method, which iterates through the dataset and trains the ResNet-50 model.
After training, the best model is loaded back from the saved file (best_model.pt) using trainer.load_model_from_file method.
The validator.test_on_folder method is used to test the trained model on the test dataset, and the results are saved to test_results.csv.


