In [None]:
import copy
import os
import random

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, models, transforms

In [None]:
!pip install albumentations==0.4.6 -q
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#cd to Dataset dir, structure as in readme
%cd drive/MyDrive/Dataset

In [None]:
#try out different augmentations here, list of options at https://github.com/albumentations-team/albumentations

train_transforms = A.Compose(
    [
        A.Resize(64,64),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=360, p=0.5),
        A.MultiplicativeNoise(multiplier=[0.5,2], per_channel=True, p=0.2),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

val_transforms = A.Compose(
    [
        A.Resize(64,64),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=360, p=0.5),
        A.MultiplicativeNoise(multiplier=[0.5,2], per_channel=True, p=0.2),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

test_transforms = A.Compose(
    [
        A.Resize(64,64),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

In [None]:
#change this to the number of classes in your dataset

NUM_OF_CLASSES = 3

In [None]:
train_data_path = 'train'
valid_data_path = 'valid'
test_data_path = 'test'
classes = os.listdir('train')

train_image_paths = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser("train")) for f in fn]
valid_image_paths = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser("valid")) for f in fn]
random.shuffle(train_image_paths)
random.shuffle(valid_image_paths)
test_image_paths = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser("test")) for f in fn]

print('train_image_path example: ', train_image_paths[0])
print('test_image_path example: ', test_image_paths[0])
print('class example: ', classes[0])
print("Train size: {}\nValid size: {}\nTest size: {}".format(len(train_image_paths), len(valid_image_paths), len(test_image_paths)))

In [None]:
idx_to_class = {i:j for i, j in enumerate(classes)}
class_to_idx = {value:key for key,value in idx_to_class.items()}

print(f'Class to index mapping: {class_to_idx}')

In [None]:
#your custom dataset class def

class CustomDataset(Dataset):
    
    def __init__(self, image_paths, transform=False):
        self.image_paths = image_paths
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_filepath = self.image_paths[idx]
        image = cv2.imread(image_filepath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        label = image_filepath.split("/")[-2]
        label = class_to_idx[label]
        
        if self.transform is not None:
            image = self.transform(image=image)['image']
            
        return image, label

In [None]:
train_dataset = CustomDataset(train_image_paths,train_transforms)
valid_dataset = CustomDataset(valid_image_paths,train_transforms)
test_dataset = CustomDataset(test_image_paths,test_transforms)

In [None]:
print('The shape of tensor for 50th image in train dataset: ',train_dataset[49][0].shape)
print('The label for 50th image in train dataset: ',train_dataset[49][1])

In [None]:
def visualize_augmentations(dataset, idx=0, samples=10, cols=5, random_img=False):
    
    dataset = copy.deepcopy(dataset)
    dataset.transform = A.Compose([t for t in dataset.transform if not isinstance(t, (A.Normalize, ToTensorV2))])
    rows = samples // cols
    
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12,8))
    for i in range(samples):
        if random_img:
            idx = np.random.randint(1,len(train_image_paths))
        image, lab = dataset[idx]
        ax.ravel()[i].imshow(image)
        ax.ravel()[i].set_axis_off()
        ax.ravel()[i].set_title(idx_to_class[lab])
        
    plt.tight_layout(pad=1)
    plt.show()

In [None]:
visualize_augmentations(train_dataset, np.random.randint(1, len(train_image_paths)), random_img=True)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
image_datasets = {'train': train_dataset, 'valid': valid_dataset, 'test': test_dataset}
dataloaders = {'train': train_loader, 'valid': valid_loader, 'test': test_loader}
dataset_sizes = {'train': len(train_dataset), 'valid': len(valid_dataset), 'test': len(test_dataset)}

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=12):

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train() 
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'valid' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    print(f'Best val Acc: {best_acc:4f}')

    model.load_state_dict(best_model_wts)
    return model

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['valid']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {classes[preds[j]]}')
                plt.imshow(inputs.cpu().data[j].swapaxes(0, 1).swapaxes(1,2))

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
#load a pretrained model, replace with a final layer
#change training parameters here if needed

model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, NUM_OF_CLASSES)
model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                       num_epochs=50)

In [None]:
model_name = 'model_trained.pth'
torch.save(model_ft.state_dict(), model_name)

In [None]:
#loading the saved model

model_loaded = models.resnet18(pretrained=True)
num_ftrs = model_loaded.fc.in_features
model_loaded.fc = nn.Linear(num_ftrs, NUM_OF_CLASSES)
model_loaded.load_state_dict(torch.load(model_name, map_location='cpu'))

In [None]:
visualize_model(model_loaded)