#### **Data Augmentation using random cropping**

In [None]:
import os
import Augmentor

In [None]:
# Average image file per object = 1350
def augument_crop(data_dir, label, out_dir, num_transformation=1):
    data_fp = os.path.join(data_dir, label)
    out_fp = os.path.join(out_dir, label)
    
    aug_pipeline = Augmentor.Pipeline(source_directory=data_fp, output_directory=out_fp)
    aug_pipeline.crop_random(probability=0.5, percentage_area=0.3)
    num_of_samples = int(20)
    
    aug_pipeline.sample(num_of_samples)

In [None]:
folder_names = ['cassette_player', 'chain_saw', 'church', 'english_springer', 'french_horn', 'garbage_truck', 'gas_pump', 'golf_ball', 'parachute', 'tench']

In [None]:
data_dir = "dataset/"
out_dir = os.path.join(os.getcwd(), "dataset_crop_random")

In [None]:
for folder_name in folder_names:
    augument_horizontal_crop(data_dir=data_dir, label=folder_name, out_dir=out_dir)

#### **Mapping the dataset**

In [None]:
import os
from os import walk

import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
current_dir = os.path.abspath(os.getcwd())
data_path = "dataset_crop_random/" # Change the mapping directory
data_mapping = []

for (dirpath, dirnames, filenames) in walk(data_path):
    if filenames:
        label_name = dirpath.split("/")[-1]
        data_full_fp = [os.path.join(current_dir, dirpath, fname) for fname in filenames]
        
        for data_fp in data_full_fp:
            data_mapping.append([data_fp, label_name])

In [None]:
data_df = pd.DataFrame(data_mapping, columns=["filename", "label"])
data_df.head()

In [None]:
labels_map = {
    'cassette_player': 0,
    'chain_saw': 1,
    'church': 2,
    'english_springer': 3,
    'french_horn': 4,
    'garbage_truck': 5,
    'gas_pump': 6,
    'golf_ball': 7,
    'parachute': 8,
    'tench': 9
}

In [None]:
train, test = train_test_split(data_df, 
                               test_size=0.2, 
                               stratify=data_df["label"], 
                               random_state=42)
print(train.shape)
print(test.shape)

In [None]:
train.to_csv(os.path.join(data_path, "train.csv"), index=False)
test.to_csv(os.path.join(data_path, "test.csv"), index=False)

#### **Modelling Using the Augmentated Data**

In [None]:
import os
import pandas as pd
from PIL import Image
from datetime import datetime

import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torchvision.models as models

from imgaug import augmenters as iaa

import matplotlib.pyplot as plt
import seaborn as sns; sns.set()

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, labels_map, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_labels['label'] = self.img_labels['label'].map(labels_map)
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = self.img_labels.iloc[idx, 0]
        image = Image.open(img_path).convert('RGB')
        
        label = torch.tensor(int(self.img_labels.iloc[idx, 1]), dtype=torch.uint8)
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

In [None]:
class ResNetModel(nn.Module):
    def __init__(self, num_classes):
        super(ResNetModel, self).__init__()
        self.resnet18 = models.resnet18(pretrained=False)
        self.resnet18.fc = nn.Linear(in_features=512, out_features=num_classes)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.resnet18(x)

In [None]:
def create_dir(dir_name):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)
        
    else:
        pass

In [None]:
def dump_model(training_id, model_dir, model):
    model_name = f"{training_id}_resnet18_od.pth"
    create_dir(dir_name=model_dir)
        
    model_dump_fp = os.path.join(model_dir, model_name)
    torch.save(model.state_dict(), model_dump_fp)

In [None]:
labels_map = {
    'cassette_player': 0,
    'chain_saw': 1,
    'church': 2,
    'english_springer': 3,
    'french_horn': 4,
    'garbage_truck': 5,
    'gas_pump': 6,
    'golf_ball': 7,
    'parachute': 8,
    'tench': 9
}

In [None]:
train_annotation_fp = "dataset_90_rotation/train.csv"
test_annotation_fp = "dataset_90_rotation/test.csv"

model_dir = "./models"
training_id = f"random_crop_{datetime.utcnow().timestamp()}" #change the training id

log_dir = os.path.join("./log_dir", training_id)
create_dir(log_dir)

In [None]:
# ResNet-18 expects input images of size (224, 224)
feature_transform = transforms.Compose([
    transforms.Resize(size=(224, 224)),
    transforms.ToTensor(),
])

train_dataset = CustomImageDataset(annotations_file=train_annotation_fp, 
                                   labels_map=labels_map,
                                   transform=feature_transform,
                                  )
test_dataset = CustomImageDataset(annotations_file=test_annotation_fp, 
                                  labels_map=labels_map,
                                  transform=feature_transform,
                                 )

# Define the DataLoader
batch_size = 16
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Display image and label.
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

In [None]:
# Plotting some image samples from the dataloaders
plt.figure(figsize=(10,10))
for i in range(25):
    train_features, train_labels = next(iter(train_dataloader))
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_features[0].numpy().transpose((1, 2, 0)))
    plt.xlabel(train_labels[0].tolist())
plt.show()

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
num_class = 10
learning_rate = 1e-03
num_epoch = 20
save_model = True

model = ResNetModel(num_classes=num_class)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
model.train()
model.to(device)

epoch_ls, total_loss, total_accuracy = [], [], []

for epoch in range(num_epoch):

    loss_train, acc_train = 0, 0
    total_samples = 0
    
    for i, data in enumerate(train_dataloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # calculate the loss and accuracy
        loss_train += loss.item()
        
        _, predicted = torch.max(outputs, 1)
        acc_train += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        
        loss.backward()
        optimizer.step()
    
    epoch_loss = loss_train / len(train_dataloader)
    epoch_accuracy = acc_train / total_samples
    
    epoch_ls.append(epoch)
    total_loss.append(epoch_loss)
    total_accuracy.append(epoch_accuracy)
    
    print(f"Current epoch: {epoch} | Training Accuracy: {epoch_accuracy} | Training Loss: {epoch_loss}")
    
if save_model:
    dump_model(training_id, model_dir, model)

In [None]:
correct = 0
total = 0

with torch.no_grad():
    for data in test_dataloader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
test_acc = 100 * correct // total
print(f'Test Accuracy: {test_acc}%')

In [None]:
train_metrics_df = pd.DataFrame(zip(epoch_ls, total_loss, total_accuracy), columns=["epoch", "loss", "accuracy"])
train_metrics_df.to_csv(f"{os.path.join(log_dir, 'training_metrics.csv')}", index=False)
train_metrics_df

In [None]:
test_acc = 100 * correct // total

In [None]:
test_metrics_df = pd.DataFrame([test_acc], columns=["accuracy"])
test_metrics_df.to_csv(f"{os.path.join(log_dir, 'test_metrics.csv')}", index=False)
test_metrics_df

In [None]:
sns.lineplot(data=pd.melt(train_metrics_df, ["epoch"]), x="epoch", y="value", hue="variable")

### Deprecated Codes

In [None]:
# img = train_features[0].numpy().transpose((1, 2, 0))
# label = train_labels[0]
# plt.imshow(img)
# plt.show()
# print(f"Label: {label}")