In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from PIL import Image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
import os
import random
from tqdm import tqdm

In [None]:
def seed_everything(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    os.environ['PYTHONHASHSEED']=str(seed)

seed_everything(42)

In [None]:
os.chdir('C:\\Users\\Daniel\\Desktop\\Grad school\\UWaterloo Masters\\Winter 2024\\STAT923\\Final Project\\STAT932_Final_Project\\Images')
data = pd.read_csv('data.csv')
data.head()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data['images'], data['labels'], 
                                                    test_size = 0.2, stratify= data['labels'],
                                                    random_state=42)
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
def preproc_images(image_paths, label_images, target_size = (80,80)):
    images = []
    labels = []
    idx = 0
    for img_path in image_paths:
        with Image.open(img_path) as img:
            img_resized = img.resize(target_size)
            #img_gray = img_resized.convert('L')
            img_array = np.array(img_resized) / 255.0
            images.append(img_array)
            labels.append(label_images[idx])
        idx += 1
    images = np.array(images)
    labels = np.array(labels)
    return images, labels

os.chdir('C:\\Users\\Daniel\\Desktop\\Grad school\\UWaterloo Masters\\Winter 2024\\STAT923\\Final Project\\STAT932_Final_Project\\Images\\DrivImages')
train_images, train_labels = preproc_images(list(X_train), list(y_train))
test_images, test_labels = preproc_images(list(X_test), list(y_test))

In [None]:
data_gen = ImageDataGenerator(
    rotation_range=20,
    zoom_range=0.2,
    shear_range=0.2,
)

labels_to_augment = [1, 2]

In [None]:
augmented_train_images = []
augmented_train_labels = []

augmentations_per_image = 18


for img, label in zip(train_images, train_labels):
    if label in labels_to_augment:
        img = img.reshape((1,) + img.shape)
        idx = 0
        augment = data_gen.flow(img, batch_size = 1)
        for batch in augment:
            augmented_image = batch[0]
            augmented_train_images.append(augmented_image)
            augmented_train_labels.append(label)
            idx += 1

            if idx == 5:
                plt.imshow(augmented_image, cmap='gray')
                plt.axis('off') 
                plt.show()

            if idx >= augmentations_per_image:
                break
            
    else:
        augmented_train_images.append(img)
        augmented_train_labels.append(label)


augmented_train_images = np.array(augmented_train_images)
augmented_train_labels = np.array(augmented_train_labels)

In [None]:
augmented_train_images_reshaped = augmented_train_images.reshape(augmented_train_images.shape[0], -1)
test_images_reshaped = test_images.reshape(test_images.shape[0], -1)
print(augmented_train_images_reshaped.shape)
print(test_images_reshaped.shape)

We can define our data loader this time for CNN, so the images, instead of being in the dimension of (n, h x w x c), where h is height, w is width, c is number of channels, the images will be in the form of (n, c,h, w)

In [None]:
class ImageDataset(Dataset):
    def __init__(self, image_data, label_data, transform=None):
        self.image_data = image_data
        self.label_data = label_data
        self.transform = transform

    def __len__(self):
        return len(self.image_data)

    def __getitem__(self, idx):
        feature_row = self.image_data[idx,]
        label_row = self.label_data[idx,]

        h = 80
        w = 80
        num_channels = 3
    
        image  = torch.tensor(feature_row, dtype = torch.float)
        label = torch.tensor(label_row, dtype = torch.long)
        image = image.reshape(num_channels, h,w)

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
train_dataset = ImageDataset(image_data= augmented_train_images_reshaped, label_data = augmented_train_labels)
test_dataset = ImageDataset(image_data = test_images_reshaped, label_data = test_labels)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

We can create a simple CNN class

In [None]:
os.environ['CUDA'] = '1'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
class CNN(nn.Module):
    def __init__(self, num_classes=3):
        super(CNN, self).__init__()
        
        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        
        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        
        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        
         # Convolutional layer block 4
        self.conv4 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(512)
        
        # Convolutional layer block 5
        self.conv5 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(1024)
        
        # Max pooling
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Fully connected layers
        self.fc1 = nn.Linear(1024 * 2 * 2,1024) 
        self.fc2 = nn.Linear(1024, num_classes)
        
        # Dropout layer
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = self.pool(F.relu(self.bn5(self.conv5(x))))

        x = x.view(-1,  1024 * 2 * 2)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

In [None]:
class EarlyStopping:
    def __init__(self, patience=3, verbose=False, delta=0, path='checkpoint.pt'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decreases.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
model = CNN()
model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) 

In [None]:
cnn_wts_path = "C:\\Users\\Daniel\\Desktop\\Grad school\\UWaterloo Masters\\Winter 2024\\STAT923\\Final Project\\STAT932_Final_Project\\CNN\\CNN_checkpoint_saved.pt"
early_stopping = EarlyStopping(patience=10, verbose=True, path = cnn_wts_path)

num_epochs = 100
for epoch in range(100):
    model.train()
    total_loss = 0.0
    test_loss = 0.0

    for image, labels in tqdm(train_loader):
        image = image.cuda().to(device)
        labels = labels.cuda().to(device)

        optimizer.zero_grad()

        outputs = model(image) 
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    model.eval()

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.cuda().to(device)
            labels = labels.cuda().to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
        
    test_loss /= len(test_loader)
    early_stopping(test_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

    print(f'Epoch [{epoch+1}/{100}], Loss: {total_loss/len(train_loader):.4f}')

In [None]:
# Loading best weight after experimentation
cnn_best_path = "C:\\Users\\Daniel\\Desktop\\Grad school\\UWaterloo Masters\\Winter 2024\\STAT923\\Final Project\\STAT932_Final_Project\\CNN\\CNN_checkpoint.pt"
model.load_state_dict(torch.load(cnn_best_path))

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.cuda().to(device)
        labels = labels.cuda().to(device)

        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Accuracy of the network on the test images: {100 * correct / total}%')