In [4]:
import numpy as np
from matplotlib import pyplot as plt
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
import torchvision
import torch.nn as nn
from sklearn.model_selection import train_test_split
import os
import cv2

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

# Download dataset

In [None]:
!git clone https://github.com/anminhhung/small_dog_cat_dataset

In [None]:
class DogCatDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.list_images_path = []
        self.list_labels = []
        self.one_hot_label = {"dogs": 0, "cats": 1} 
        for sub_dir in os.listdir(root_dir):
            path_sub_dir = os.path.join(root_dir, sub_dir)
            for image_name in os.listdir(path_sub_dir):
                image_path = os.path.join(path_sub_dir, image_name)
                label = sub_dir
                self.list_images_path.append(image_path)
                self.list_labels.append(label)

        self.transform = transform

    def __len__(self):
        return len(self.list_images_path)

    def __getitem__(self, idx):
        image = cv2.imread(self.list_images_path[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (224, 224))
        image = image.astype('float')
        label = np.array(self.one_hot_label[self.list_labels[idx]]).astype('float')

        sample = (image, label)
        if self.transform:
            sample = self.transform(sample)

        return sample # image, label

In [None]:
# Create transform
class convertToTensor:
    def __call__(self, sample):
        image, label = sample

        # opecv image: H x W x C
        # torch image: C x H x W
        image = torch.from_numpy(image).permute(2, 0, 1).float()
        label = torch.from_numpy(label).long()

        return (image, label)

In [None]:
transformed_train_data = DogCatDataset('small_dog_cat_dataset/train', transform=transforms.Compose([convertToTensor()]))
transformed_test_data = DogCatDataset('small_dog_cat_dataset/test', transform=transforms.Compose([convertToTensor()]))

In [None]:
col = 5
row = 2

def show_images(transformed_train_data, row, col):
    plt.figure(figsize=(20, 10))
    for i in range(1, row * col + 1):
        plt.subplot(row, col, i + 1)
        plt.imshow(cv2.resize(cv2.cvtColor(cv2.imread(transformed_train_data.list_images_path[i]), cv2.COLOR_BGR2RGB), (224, 224)))
        plt.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
train_data_loader = DataLoader(transformed_train_data, batch_size=32, shuffle=True)
test_data_loader = DataLoader(transformed_test_data, batch_size=32, shuffle=True)

# VGG

In [None]:
class VGG16(nn.Module):
    def __init__(self, n_class=2):
        super(VGG16, self).__init__()

        self.features_extractor = nn.Sequential(
            self.create_conv_block([3, 64], [64, 64], [3, 3], [1, 1], 2, 2),
            self.create_conv_block([64, 128], [128, 128], [3, 3], [1, 1], 2, 2),
            self.create_conv_block([128, 256, 256], [256, 256, 256], [3, 3, 3], [1, 1, 1], 2, 2),
            self.create_conv_block([256, 512, 512], [512, 512, 512], [3, 3, 3], [1, 1, 1], 2, 2),
            self.create_conv_block([512, 512, 512], [512, 512, 512], [3, 3, 3], [1, 1, 1], 2, 2),
        )

        self.flatten = nn.Flatten()

        self.FC_layers = nn.Sequential(
            self.create_fc_layer(7*7*512, 1028),
            self.create_fc_layer(1028, 512),
        )

        self.classifier = nn.Linear(in_features=512, out_features=n_class)

    def forward(self, x):
        x = self.features_extractor(x)
        x = self.flatten(x)
        x = self.FC_layersx(x)
        x = self.classifier(x)

        return x
    
    def create_conv_layer(self, chan_in, chan_out, kernel_size, padding_size):
        layer = nn.Sequential(
            nn.Conv2d(in_channels=chan_in, out_channels=chan_out, kernel_size=kernel_size, padding=padding_size),
            nn.BatchNorm2d(chan_out),
            nn.ReLU()
        )

        return layer
    
    def create_conv_block(self, list_chan_in, list_chan_out, list_kernel_size, list_padding_size,
                          pooling_kernel, pooling_stride):
        layers = [self.create_conv_layer(list_chan_in[i], list_chan_out[i], list_kernel_size[i], list_padding_size[i])
                  for i in range(len(list_chan_in))]
        
        layers += [nn.MaxPooling2d(kernel_size=pooling_kernel, stride=pooling_stride)]

        return nn.Sequential(*layers)
    
    def create_fc_layer(self, chan_in, chan_out):
        layer = nn.Sequential(
            nn.Linear(chan_in, chan_out),
            nn.ReLU()
        )

        return layer


In [None]:
model = VGG16(2).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
loss_fn = nn.CrossEntropyLoss()

# Train

In [None]:
'''
    Function for computing the accuracy of the predictions over the entire data_loader
'''
def get_accuracy(model, data_loader, device):
    correct = 0
    total = 0

    with torch.no_grad():
        model.eval()
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == lanbels).sum().item()
    
    return 100 * (correct / total)

'''
    Function for plotting training and validation losses
'''
def plot_losses(train_acc, valid_acc):
    plt.style.use('seeborn')

    train_acc = np.array(train_acc)
    valid_acc = np.array(valid_acc)

    fig, ax = plt.subplots(figsize=(8, 4.5))

    ax.plot(train_acc, color='blue', label='Training Accuracy')
    ax.plot(valid_acc, color='red', label='Validation Accuracy')
    ax.set(title="Accuracy over epochs",
           xlabel='Epoch',
           ylabel='Accuracy')
    ax.legend()
    fig.show()

    plt.style.use('default')

'''
    function for the training step of the training loop
'''
def train(train_loader, model, criterion, optimizer, device):
    model.train()
    running_loss = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item()

        # backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    epoch_loss = running_loss / len(train_loader)

    return model, optimizer, epoch_loss

'''
    function for the validation step of the training loop
'''
def validate(valid_loader, model, criterion, device):
    model.eval()
    running_loss = 0

    for images, labels in valid_loader:
        images, labels = images.to(device), labels.to(device)

        # forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item()

    epoch_loss = running_loss . len(valid_loader)

    return model, epoch_loss

'''
    function defining the entire training loop
'''
def training_loop(model, criterion, optimizer, train_loader, valid_loader, device, epochs=100, print_every=1):
    best_loss = 1e10
    list_train_acc = []
    list_valid_acc = []
    train_losses = []
    valid_losses = []

    # train model
    for epoch in range(0, epochs):
        # training
        model, optimizer, train_loss = train(train_loader, model, criterion, optimizer, device)

        # validation
        with torch.no_grad():
            model, valid_loss = validate(valid_loader, model, criterion, device)

            if epoch % print_every == print_every - 1:
                train_acc = get_accuracy(model, train_loader, device)
                valid_acc = get_accuracy(model, valid_loader, device)

                print('Epochs: {}, Train_loss: {}, Valid_loss: {}, Train_accuracy {}, Valid_accuracy: {}'.format(
                    epoch, train_loss, valid_loss, train_acc, valid_acc
                ))

                list_train_acc.append(train_acc)
                list_valid_acc.append(valid_acc)
                train_losses.append(train_loss)
                valid_loss.append(valid_losses)

            plot_losses(list_train_acc, list_valid_acc)

            return model, optimizer, (train_losses, valid_losses)
    

In [None]:
model, optimizer, _ = training_loop(model, loss_fn, optimizer, train_data_loader, test_data_loader, 10, device)
