In [1]:
# from zipfile import ZipFile
# dataset_path = "./cnn_dataset.zip"
# with ZipFile(dataset_path, 'r') as z: 
#     z.extractall(path='./cnn_dataset')  
#     print("Done")

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from tqdm import tqdm
from sklearn import metrics
from sklearn.model_selection import train_test_split

import torch
from torch import nn, optim
import torch.nn.functional as F

import albumentations as A
from albumentations.pytorch import ToTensorV2

In [3]:
class Dataset:
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        img = np.array(Image.open(img_path))
        label = self.labels[index]

        if self.transform is not None:
            img = self.transform(image=img)["image"]

        return img, torch.tensor(label, dtype=torch.long)

In [4]:
imgs_dir = "/content/drive/MyDrive/cnn_dataset"

img_paths, labels = [], []
for i, d in enumerate(os.scandir(imgs_dir)):
    for f in os.scandir(d.path):
        img_paths.append(f.path)
        labels.append(i) 

train_img_paths, test_img_paths, train_labels, test_labels = train_test_split(img_paths, labels, test_size=0.2, random_state=42, stratify=labels)

In [5]:
HEIGHT = 224
WIDTH = 224
NUM_WORKERS = 0
TRAIN_BATCH_SIZE = 32
TEST_BATCH_SIZE = 32

transform = A.Compose(
    [
        A.Resize(height=HEIGHT, width=WIDTH),
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),
    ]
)

train_dataset = Dataset(train_img_paths, train_labels, transform)
train_data_loader = torch.utils.data.DataLoader(
    train_dataset, 
    batch_size=TRAIN_BATCH_SIZE,
    shuffle=True,
)

test_dataset = Dataset(test_img_paths, test_labels, transform)
test_data_loader = torch.utils.data.DataLoader(
    test_dataset, 
    batch_size=TEST_BATCH_SIZE,
    shuffle=False,
)

In [8]:
import torch
import torch.nn as nn

class VGG13(nn.Module):
    def __init__(self):
        super(VGG13, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv8 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.conv9 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.conv10 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv11 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.conv12 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.conv13 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(512*7*7, 4096)
        self.fc2 = nn.Linear(4096, 4096)
        self.fc3 = nn.Linear(4096, 1000)
        
    def forward(self, x):
        x = self.conv1(x)
        x = nn.ReLU()(x)
        x = self.conv2(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = self.conv3(x)
        x = nn.ReLU()(x)
        x = self.conv4(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = self.conv5(x)
        x = nn.ReLU()(x)
        x = self.conv6(x)
        x = nn.ReLU()(x)
        x = self.conv7(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = self.conv8(x)
        x = nn.ReLU()(x)
        x = self.conv9(x)
        x = nn.ReLU()(x)
        x = self.conv10(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = self.conv11(x)
        x = nn.ReLU()(x)
        x = self.conv12(x)
        x = nn.ReLU()(x)
        x = self.conv13(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = x.view(-1, 512*7*7)
        x = self.fc1(x)
        x = nn.ReLU()(x)
        x = nn.Dropout(p=0.5)(x)
        x = self.fc2(x)
        x = nn.ReLU()(x)
        x = nn.Dropout(p=0.5)(x)
        x = self.fc3(x)
        x = nn.Softmax(dim=1)(x)
        return x


In [9]:
def train(train_data_loader, model, criterion, optimizer, epoch):
    model.train()
    losses = []

    train_progress_bar = tqdm(enumerate(train_data_loader), total=len(train_data_loader), leave=True)
    for batch_idx, (imgs, labels) in train_progress_bar:
        imgs = imgs.to(DEVICE)
        labels = labels.to(DEVICE)
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_progress_bar.set_description(f"Epoch [{epoch}/{epochs-1}]")
        train_progress_bar.set_postfix(train_loss=loss.item())
        losses.append(loss.item())
        
    train_progress_bar.close()
    return sum(losses) / len(losses)

def evaluate(test_data_loader, model, criterion):
    model.eval()
    losses = []
    predictions = []
    targets = []

    test_progress_bar = tqdm(enumerate(test_data_loader), total=len(test_data_loader), leave=True)
    for _, (imgs, labels) in test_progress_bar:
        imgs = imgs.to(device=DEVICE)
        labels = labels.to(DEVICE)

        with torch.no_grad():
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            losses.append(loss.item())

        predictions.append(torch.softmax(outputs, dim=1).detach().cpu())
        targets.append(labels.detach().cpu())

    predictions = torch.cat(predictions, dim=0).numpy().argmax(axis=1)
    targets = torch.cat(targets, dim=0).view(-1).numpy()

    accuracy = metrics.accuracy_score(targets, predictions)
    test_progress_bar.close()
    return sum(losses)/len(losses), accuracy

In [10]:
def save_checkpoint(model, optimizer, filename="model.pth.tar"):
    print("=>Saving Checkpoint...")
    checkpoint = {"state_dict": model.state_dict(), "optimizer": optimizer.state_dict()}
    torch.save(checkpoint, filename)

def load_checkpoint(checkpoint_path, model, optimizer):
    print("=>Loading Checkpoint...")
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LOAD_MODEL = False
learning_rate = 1e-4
epochs = 1

model = VGG13()
model = model.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

if LOAD_MODEL: 
    load_checkpoint(f"model.pth.tar", model, optimizer)

train_losses, test_losses, train_accuracies, test_accuracies = [], [], [], []
acc = -float("inf")
for epoch in range(epochs):
    train_loss = train(train_data_loader, model, criterion, optimizer, epoch)  
    _, train_accuracy = evaluate(train_data_loader, model, criterion) 
    test_loss, test_accuracy = evaluate(test_data_loader, model, criterion)

    if test_accuracy > acc:
        save_checkpoint(model, optimizer, filename=f"model.h5")
        acc = test_accuracy

    train_losses.append(train_loss)
    test_losses.append(test_loss)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)
    print(f"Epoch: {epoch}, Loss: {train_loss}, train_accuracy: {train_accuracy}, test_accuracy: {test_accuracy}")

Epoch [0/0]:  71%|███████   | 530/751 [3:21:49<1:31:00, 24.71s/it, train_loss=6.5]

In [None]:
def get_smooth(arr):
    res = []
    s = 0
    for ind, ele in enumerate(arr):
        s += ele
        res.append(s/(ind+1))
    return res

In [None]:
plt.plot(range(epochs), get_smooth(train_accuracies), label='train_accuracy')
plt.plot(range(epochs), get_smooth(test_accuracies), label='test_accuracy')
plt.legend(loc='upper right')
print("Accuracyrelu is: ", acc)

In [None]:
plt.plot(range(epochs), get_smooth(train_losses), label='train_loss')
plt.plot(range(epochs), get_smooth(test_losses), label='test_loss')
plt.legend(loc='upper right')
print("Accuracyrelu is: ", acc)