In [None]:
import os
import sys
from collections import Counter

import cv2
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import PIL
import sklearn
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchinfo
import torchvision
from PIL import Image
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from torch.utils.data import DataLoader, random_split
from torchinfo import summary
from torchvision import datasets, transforms
from tqdm.notebook import tqdm
from tqdm.version import __version__ as tqdm__version__

torch.backends.cudnn.deterministic = True

In [None]:
print("Platform:", sys.platform)
print("Python version:", sys.version)
print("---")
print("CV2 version : ", cv2.__version__)
print("matplotlib version : ", matplotlib.__version__)
print("numpy version : ", np.__version__)
print("torch version : ", torch.__version__)
print("torchinfo version : ", torchinfo.__version__)
print("torchvision version : ", torchvision.__version__)
print("PIL version : ", PIL.__version__)
print("scikit-learn version: ", sklearn.__version__)
print("tqdm version: ", tqdm__version__)

In [None]:
data_dir = os.path.join("data_p1", "data_binary")

train_dir = os.path.join(data_dir, "train")

labels = os.listdir(train_dir)

In [None]:
hog_path = os.path.join(train_dir, "hog")
hog_images = os.listdir(hog_path)

print("length of hog images: ", len(hog_images))

blank_path = os.path.join*train_dir, "blank")
blank_images = os.listdir(blank_path)

print("length of blank images: ", len(blank_images))


hog_image_name = hog_images[0]
print(hog_image_name)
print("hog image name: ", hog_image_name)

hog_image_path = os.path.join(hog_path, hog_image_name)
print(hog_image_path)


blank_image_name = blank_images[0]
print(blank_image_name)
print("blank image name: " , blank_image_name)
blank_image_path = os.path.join(blank_path, blank_image_name)
print(blank_image_path)

In [None]:
hog_img_pil = Image.open(hog_image_path)
print("Hog image: ", hog_img_pil)


blank_img_pil = Image.open(blank_image_path)
print("Blank image: ", blank_img_pil)

In [None]:
hog_img_pil

In [None]:
blank_img_pil

In [None]:
class ConvertToRGB:
    def __call__(self, img):
        if img.mode != 'RGB':
            img = img.convert('RGB')
        return img 

In [None]:
transforms = transforms.Compose([
    ConvertToRGB(),
    transforms.Resize((224,224)),
    transforms.ToTensor()
    transforms.Normalize(mean, std)
])


print(type(transforms))
print(transforms)

In [None]:
dataset = datasets.ImageFolder(root=train_dir, transform=transforms)
print(dataset)


dataset.classes

Prove that only distinct values of im are 0 and 1. You should use set data structure 

In [None]:
im = dataset.imgs
print(im[0])


#To get that the labels are only 0 and 1 
distinct_classes = set{x[1] for x in im}
print("Distinct classes: ", distinct_classes)   

Data Splitting

In [None]:
g = torch.Generator()
g.manual_seed(42)

train_dataset, val_dataset = random_split(dataset, [0.8, 0.2], generator=g)
print(f "length of train dataset: {len(train_dataset)}")

It's good to explore the data. We'll create a visualization to show the breakdown of the two classes. The function below goes through the dataset and counts how many images are in each class.

In [None]:
def class_counts():
    c = Counter(x[1] for x in tqdm(dataset))
    class_to_index = dataset.dataset.class_to_idx
    return pd.Series({cat: c[idx] for cat, idx in class_to_index.items()})

In [None]:
train_counts = class_counts(train_dataset)
train_counts

In [None]:
train_counts.sort_values().plot(kind='barh')

In [None]:
val_counts = class_counts(val_dataset)
val_counts
val_counts.sort_values().plot(kind='barh')

Creating a Data Loader

In [None]:
g = torch.Generator()
g.manual_seed(42)

batch_size = 32 
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True, generator=g)
print(f"length of train loader: {len(train_loader)}")

val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, generator=g)
print(f"length of validation loader: {len(val_loader)}")


In [None]:
data_iter = iter(train_loader)
images, labels = next(data_iter)

image_shape = images.shape
print("Images shape: ", image_shape)

label_shape = labels.shape
print("Labels shape: ", label_shape)

In [None]:
labels

<b>Building a Shallow Neural Network

In [None]:
## Example of flattening the image since the images are 3D tensors, we need to flatten them to 1D for the neural network
flatten = nn.Flatten()

tensor_flatten = flatten(images)

Using sequential network

In [None]:
height = 224
weight = 224 

model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(3 * height * width, 512), 
    nn.ReLU(),
    nn.Linear(512, 128),
    nn.ReLU()
)

print("model type: ", type(model))
print("model structure:")
print(model)

In [None]:
#Implementing the last layer which is the output layer 
output_layer = nn.Linear(128, 2)

model.append(output_layer)

In [None]:
# Putting the model into the device for cuda or gpu runtime
model.to(device)
print(model)

In [None]:
summary(model, input_size=(batch_size, 3,height, width))

<b>The loss function measures how well our model does for a given set of model parameters</b>. The chosen loss function, cross-entropy, is pretty standard. It's the same loss function used for simpler machine learning models such as logistic regression. Note that this function expects the input to be logits from our model.

In [None]:
loss_fn = nn.CrossEntropyLoss()

We also need an <b>optimizer</b>. This will adjust the model's parameters to try to minimize the loss function. We've chosen the Adam optimizer, a popular optimizer. The Adam optimizer is a gradient based optimizer like stochastic gradient descent. The Adam optimizer has additional features that make it less likely to get stuck in a local minimum. It converges to a better state faster than standard stochastic gradient descent. The optim.Adam class is initialized with the model parameters through model.parameters. An optional argument is the learning rate lr. This controls how large the step sizes are in gradient descent. Keeping the default value will be fine for our purposes. We've explicitly specified the default value in this case.

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01)

Now we'll create a function called train_epoch that encapsulates the training process. The code is daunting at first but we're going to break it down. The function accepts

<br>model: The PyTorch model we built with a specific architecture
<br>optimizer: The optimizer that will be used to best adjust the model weights
<br>loss_fn: The loss function that the optimizer is trying to minimize
<br>data_loader: The DataLoader object for the training dataset that makes it easy to iterate over batches
<br>device: The device where we're going to place the tensors

In [None]:
def train_epoch(model, optimizer, loss_fn, data_loader, device='cpu'):
    # We'll report the loss function's average value at the end of the epoch.
    training_loss = 0.0

    #The train method simply sets the model in training mode. No training has happened.
    model.train()

    # We iterate over all batches in the training set to complete one epoch
    for inputs, targets in tqdm(data_loader, desc="Training", leave=False):
        optimizer.zero_grad()

    # Unpack images (X) and labels (y) from the batch and add those
    # tensors to the specified device.

    inputs = inputs.to(device)
    targets = targets.to(device)

    # We make a forward pass through the network and obtain the logits.
    # With the logits, we can calculate our loss.
    output = model(inputs)
    loss = loss_fn(output, targets)

    # After calculating our loss, we calculate the numerical value of
    # the derivative of our loss function with respect to all the
    # trainable model weights. Once we have the gradients calculated,
    # we let the optimizer take a "step", in other words, update or
    # adjust the model weights.

    loss.backward()
    optimizer.step()

    # We increment the training loss for the current batch
    training_loss += loss.data.item() * input.size(0)
    
    #we calculate the training loss over the completed batch
    return training_loss / len(data_loader.dataset)

The tqdm function that is wrapped around the data_loader will give us a progress bar that fills up as we process the data. It's not necessary for the training process. It's just there to reassure us that something is actually happening!

We'll train the model for one epoch. The train_epoch function returns the average training loss, cross-entropy, for the epoch.

In [None]:
loss_value = train_epoch(model, optimizer, loss_fn, train_loader, device)
print(f"The average loss during the training epoch was {loss_value:.2f}.")

In [None]:
def predict(model, data_loader, device='cpu'):
    #The tensor will store all the predictions
    all_probs = torch.tensor([]).to(device)

    #We set the model to evaluation mode. This mode is the opposite of the train mode we set in the train_epoch function
    model.eval()

    # Since we're not training, we don't need any gradient calculations.
    # This tells PyTorch not to calculate any gradients, which speeds up
    # some calculations.
    with torch.no_grad():

        for inputs, targets in tqdm(data_loader, desc='Predicting', leave=False):
            inputs = inputs.to(device)
            output = model(inputs)

            # The model produces the logits.  This softmax function turns the
            # logits into probabilities.  These probabilities are concatenated
            # into the `all_probs` tensor.

            probs = F.softmax(output, dim=1)
            all_probs = torch.cat((all_probs, probs), dim=0)
    
    return all_probs

In [None]:
probabilities_train = predict(model, train_loader, device)
print(probabilities_train.shape)

torch.size([2553, 2])

In [None]:
probabilities_val = predict(model, val_loader, device)
print(probabilities_val.shape)

In [None]:
total_probability = probabilities_val[0].sum()
print(f"Sum of probabilities: {total_probability.item()}")

torch.size([638,2])

To make a prediction from these probabilities, we predict the class with the highest probability for each row. This can be done with the torch.argmax function, like so:

In [None]:
predictions_train = torch.argmax(probabilities_train, dim=1)

print(f"Predictions shape: {predictions_train.shape}")
print(f"First 10 predictions: {predictions_train[:10]}")

In [None]:
predictions_val = torch.argmax(probabilities_val, dim=1)

print(f"Predictions shape: {predictions_val.shape}")
print(f"First 10 predictions: {predictions_val[:10]}")

Get the accurate prediction on the probabilities of the training set

In [None]:
targets_train = torch.cat([labels for _, labels in train_loader]).to(device)
is_correct_train = torch.eq(predictions_train, targets_train)
total_correct_train = torch.sum(is_correct_train).item()
accuracy_train =  total_correct_train / len(train_loader.dataset)

print(f"Accuracy on the training data: {accuracy_train}")

Accuracy on the training data: 0.65

In [None]:
targets_val = torch.cat([labels for _, labels in val_loader]).to(device)
is_correct_val = torch.eq(predictions_val, targets_val)
total_correct_val = torch.sum(is_correct_val)
accuracy_val = total_correct_val / len(val_loader.dataset)

print(f"Accuracy on the valdiation data: {accuracy_val}")

In [None]:
def score(model, data_loader, loss_fn, device="cpu"):
    # Initialize the total loss (cross entropy) and the number of correct
    # predictions. We'll increment these values as we loop through the
    # data.
    total_loss = 0
    total_correct = 0

    # We set the model to evaluation mode. This mode is the opposite of
    # train mode we set in the train_epoch function.
    model.eval()

    # Since we're not training, we don't need any gradient calculations.
    # This tells PyTorch not to calculate any gradients, which speeds up
    # some calculations.
    with torch.no_grad():
        # We iterate over the batches in the data loader and feed
        # them into the model for the forward pass.
        for inputs, targets in tqdm(data_loader, desc="Scoring", leave=False):
            inputs = inputs.to(device)
            output = model(inputs)

            # Calculating the loss function for this batch
            targets = targets.to(device)
            loss = loss_fn(output, targets)
            total_loss += loss.data.item() * inputs.size(0)

            # Calculating the correct predictions for this batch
            correct = torch.eq(torch.argmax(output, dim=1), targets)
            total_correct += torch.sum(correct).item()

    return total_loss / len(data_loader.dataset), total_correct / len(
        data_loader.dataset
    )

In [None]:
loss_train, accuracy_train = score(model, train_loader, loss_fn, device)
print(f"Training accuracy from score function: {accuracy_train}")

In [None]:
loss_val accuracy_val = score(model, train_loader, loss_fn, device)
print(f"Training accuracy from score function: {accuracy_val}")

In [None]:
def train(model, optimizer, loss_fn, train_loader, val_loader, epochs=20, device="cpu"):

    for epoch in range(1, epochs + 1):
        # Run train_epoch once, and capture the training loss.
        training_loss = train_epoch(model, optimizer, loss_fn, train_loader, device)

        # Score the model on the validation data.
        validation_loss, validation_accuracy = score(model, val_loader, loss_fn, device)

        print(
            f"Epoch: {epoch}, Training Loss: {training_loss:.2f}, "
            f"Validation Loss: {validation_loss:.2f}, Validation Accuracy: {validation_accuracy:.2f}"
        )

In [None]:
train(model, optimizer, loss_fn, train_loader, val_loader, epochs=5, device=device)

In [None]:
model = torch.load("model/trained_model.pth", weights_only=False)