Author: Tristan Bailey

Import Libraries

In [None]:
import torch
import os
import matplotlib.pyplot as plt
import torch.utils.data as data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim

import logging
logging.basicConfig(level=logging.INFO)

Verify System has Cuda Capability

In [None]:
has_cuda = False
if torch.cuda.is_available():
    print("CUDA is available!")
    print("Will utilize GPU acceleration")
    has_cuda = True
else:
    print("CUDA is not available.")

In [None]:
labels_map = {
    0 : "Bump",
    1 : "Crack",
    2 : "Plain",
    3 : "Pot Hole",
    4 : "Speed Bump"
}
checkpoint_interval = 1
num_classes = 5
num_epochs = 20
batch_size = 32
model_name = "Resnet_10batch_grayscale.pth"
checkpoint_dir = "checkpoints"

Training Data Directory

In [None]:
cwd = os.getcwd()
# The folder directory that has the image folders
data_dir_name = "training_data"
data_path = os.path.join(cwd, data_dir_name)
training_data_path = data_path + "_grayscale" + "_train"
testing_data_path = data_path + "_grayscale" + "_test"
model_path = os.path.join(cwd, model_name)
checkpoint_dir = os.path.join(os.getcwd(), checkpoint_dir)
print(training_data_path)

# Check if the directory exists
if not os.path.exists(checkpoint_dir):
    # If it doesn't exist, create it
    os.makedirs(checkpoint_dir)

Data Selection (ONLY DO ONCE)

In [None]:
#ONLY RUN ONCE, needed to clean names for pytorch
from PIL import Image
import os

# Define the new directory name with "_grayscale" added
grayscale_data_path = data_path + "_grayscale"

# Create the new directory if it doesn't exist
if not os.path.exists(grayscale_data_path):
    os.makedirs(grayscale_data_path)

# Iterate through subdirectories
for subdir, _, _ in os.walk(data_path):
    # Create a new subdirectory in the grayscale directory with the same name
    new_subdir = subdir.replace(data_path, grayscale_data_path)
    if not os.path.exists(new_subdir):
        os.makedirs(new_subdir)
        
    for filename in os.listdir(subdir):
        # Check if it's a file
        if os.path.isfile(os.path.join(subdir, filename)):
            # Get file extension
            file_extension = os.path.splitext(filename)[1]
            
            # Open the image and convert it to grayscale
            img = Image.open(os.path.join(subdir, filename)).convert('L')
            
            # Check if the image is in palette mode with transparency
            if img.mode == "P" and "transparency" in img.info:
                # Convert the image to RGBA mode
                img = img.convert("RGBA")
                
            # Save the grayscale image with the original filename to the new directory
            new_filename = filename.replace(file_extension, "_grayscale.png")
            img.save(os.path.join(new_subdir, new_filename))

In [None]:
#ONLY RUN ONCE, needed to get test and train set

import random
# Define the new directory path with "_train" and "_test" added
train_path = grayscale_data_path + "_train"
test_path = grayscale_data_path + "_test"

# Create the new directories if they don't exist
if not os.path.exists(train_path):
    os.makedirs(train_path)
if not os.path.exists(test_path):
    os.makedirs(test_path)

# Iterate through subdirectories
for subdir, _, filenames in os.walk(grayscale_data_path):
    # Get the new subdirectory path with "_train" or "_test" added
    new_subdir_base = subdir.replace(grayscale_data_path, train_path)
    new_subdir_test = new_subdir_base.replace("_train", "_test")
    
    # Create the new subdirectories if they don't exist
    if not os.path.exists(new_subdir_base):
        os.makedirs(new_subdir_base)
    if not os.path.exists(new_subdir_test):
        os.makedirs(new_subdir_test)
    
    # Iterate through filenames
    for filename in filenames:
        # Get the file path
        file_path = os.path.join(subdir, filename)
        
        # Decide whether to save the file in the train or test directory
        if random.random() < 0.9:
            new_file_path = os.path.join(new_subdir_base, filename)
        else:
            new_file_path = os.path.join(new_subdir_test, filename)
            
        # Copy the file to the new directory
        with open(file_path, "rb") as f_src:
            with open(new_file_path, "wb") as f_dst:
                f_dst.write(f_src.read())

Data Augmentation and Transforms

In [None]:
# Approach 2 - for RGB baseline

# Data augmentation and normalization
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
# Approach 3

# Data augmentation and normalization
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomRotation(degrees=30),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(224),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]),
    transforms.RandomApply([
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=None)
    ], p=0.5)
])


Load Dataset

In [None]:
# Approach 2 & 3

# Create training and validation datasets
train_dataset = datasets.ImageFolder(root=training_data_path, transform=transform)
train_loader = data.DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

Display Samples from Dataset

In [None]:
# Approach 2 - for RGB baseline

from torchvision.transforms.functional import to_pil_image

figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 3

# Apply the same transform as used during training
inverse_transform = transforms.Compose([
    transforms.Normalize([-0.485/0.229, -0.456/0.224, -0.406/0.225], [1/0.229, 1/0.224, 1/0.225]),
])

for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(train_dataset), size=(1,)).item()
    img, label = train_dataset[sample_idx]
    
    # Apply the inverse transform to display the image correctly
    img = inverse_transform(img)
    img = to_pil_image(img)  # Convert the tensor back to the PIL image format
    img = img.convert('L')   # Convert the image to grayscale mode

    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[label])
    plt.axis("off")
    plt.imshow(img, cmap="gray")

plt.show()

In [None]:
# Approach 3
from torchvision.transforms.functional import to_pil_image

figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 3

# Apply the same transform as used during training
inverse_transform = transforms.Compose([    transforms.Normalize([-0.5/0.5], [1/0.5])
])

for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(train_dataset), size=(1,)).item()
    img, label = train_dataset[sample_idx]
    
    # Apply the inverse transform to display the image correctly
    img = inverse_transform(img)
    img = to_pil_image(img)  # Convert the tensor back to the PIL image format
    img = img.convert('L')   # Convert the image to grayscale mode

    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[label])
    plt.axis("off")
    plt.imshow(img, cmap="gray")

plt.show()


Import Generic Pre-Trained Model

In [None]:
# Load pretrained model (ImageNet)
resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

# Remove final classifier layer and replace it with your custom layer
resnet.fc = nn.Linear(resnet.fc.in_features, num_classes)

Optimizer

In [None]:
# Approach 2 & 3

if has_cuda:
    resnet = resnet.cuda()

# Set loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Approach 3
# Convert the single channel grayscale input to 3-channel input by repeating the same image
def convert_to_3channel(inputs):
    inputs = inputs.repeat(1, 3, 1, 1)
    return inputs

Fine-Tune Model

In [None]:
# Approach 2 RGB Baseline

# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for i, (inputs, labels) in enumerate(train_loader):
        if has_cuda:
            inputs, labels = inputs.cuda(), labels.cuda()
        
        optimizer.zero_grad()

        outputs = resnet(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted_labels = torch.max(outputs, 1)
        correct_predictions += (predicted_labels == labels).sum().item()
        total_predictions += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_predictions * 100
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")
    # Checkpoint the model
    if (epoch + 1) % checkpoint_interval == 0:
        checkpoint_filename = f"checkpoint_epoch_{epoch+1}.pth"
        torch.save(resnet.state_dict(), os.path.join(checkpoint_dir, f"epoch_{epoch}.pth"))
        print(f"Checkpoint saved: {checkpoint_filename}")
        with open("resnet_accuracy.txt", "a") as f:
            f.write(f"Epoch {epoch + 1}: Accuracy = {epoch_accuracy:.2f}%\n")
print("Training complete.")


In [None]:
# Approach 3

# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for i, (inputs, labels) in enumerate(train_loader):
        if has_cuda:
            inputs, labels = inputs.cuda(), labels.cuda()
        
        inputs = convert_to_3channel(inputs)
        optimizer.zero_grad()
        outputs = resnet(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted_labels = torch.max(outputs, 1)
        correct_predictions += (predicted_labels == labels).sum().item()
        total_predictions += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_predictions * 100
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")
    
    # Checkpoint the model
    if (epoch + 1) % checkpoint_interval == 0:
        checkpoint_filename = f"checkpoint_epoch_{epoch+1}.pth"
        torch.save(resnet.state_dict(), os.path.join(checkpoint_dir, f"epoch_{epoch}.pth"))
        print(f"Checkpoint saved: {checkpoint_filename}")
        with open("resnet_accuracy.txt", "a") as f:
            f.write(f"Epoch {epoch + 1}: Accuracy = {epoch_accuracy:.2f}%\n")
print("Training complete.")

Save Model as Binary

In [None]:
torch.save(resnet.state_dict(), model_path)
print(f"Model saved as {model_name} to {model_path}")

Evaluation Transformer

In [None]:
# Add simple transfomr, no augmentation, to test
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])
test_dataset = datasets.ImageFolder(root=testing_data_path, transform=transform)
test_loader = data.DataLoader(test_dataset, batch_size=4, shuffle=True, num_workers=4)

Loading A Model

In [None]:
# Load the saved model
resnet = models.resnet18(weights=None)
resnet.fc = nn.Linear(resnet.fc.in_features, num_classes)
model_path = "epoch_29.pth"
resnet.load_state_dict(torch.load(model_path))
# If using GPU, move the model to GPU
if has_cuda:
    resnet = resnet.cuda()

Testing Model (Accuracy)

In [None]:
# Create the test dataset
# Set the model to evaluation mode
resnet.eval()
# Evaluation loop
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        if has_cuda:
            inputs, labels = inputs.cuda(), labels.cuda()
        
        inputs = convert_to_3channel(inputs)
        outputs = resnet(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Print the accuracy of the test data
print("Accuracy on test data: %d %%" % (100 * correct / total))

Testing Model (Samples)

In [None]:
# Display the first 10 images along with class labels
from torchvision.transforms.functional import to_pil_image

figure = plt.figure(figsize=(16, 16), facecolor='white')
cols, rows = 5, 4

for i, (inputs, labels) in enumerate(test_loader):
    if i == 20:
        break
        
    if has_cuda:
        inputs = inputs.cuda()
        labels = labels.cuda()
    input = inputs[0]  # extract a single image from the batch
    image = to_pil_image(input.squeeze())  # Convert the tensor back to the PIL image format
    image = image.convert('L')   # Convert the image to grayscale mode

    # Predict the label
    input = convert_to_3channel(input.unsqueeze(0))
    output = resnet(input)
    _, predicted = torch.max(output, 1)

    figure.add_subplot(rows, cols, i + 1)
    plt.title("True:" + labels_map[labels[0].item()] + "\nPredicted:" + labels_map[predicted[0].item()])
    plt.axis("off")
    plt.imshow(image, cmap="gray")
plt.show()

Checkpoint Model Comparison Graph

In [None]:
import matplotlib.pyplot as plt
import re

scores = {}
for model_filename in os.listdir(checkpoint_dir):
    match = re.search("epoch_(\d+)\.pth", model_filename)
    if match:
        epoch = int(match.group(1))
        model_path = os.path.join(checkpoint_dir, model_filename)
        # Load the saved model
        resnet = models.resnet18(weights=None)
        resnet.fc = nn.Linear(resnet.fc.in_features, num_classes)
        resnet.load_state_dict(torch.load(model_path))
        
        # If using GPU, move the model to GPU
        if has_cuda:
            resnet = resnet.cuda()
            
        # Set the model to evaluation mode
        resnet.eval()

        # Evaluation loop
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                if has_cuda:
                    inputs, labels = inputs.cuda(), labels.cuda()

                inputs = convert_to_3channel(inputs)
                outputs = resnet(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Print the accuracy on the test data
        # print("Model:", model_filename)
        # print("Accuracy on test data: %d %%" % (100 * correct / total))
        scores[epoch] = (100 * correct / total)
        
# Process the dictionary into two lists
epoch_numbers = []
accuracies = []
for epoch_number, accuracy in sorted(scores.items()):
    epoch_numbers.append(epoch_number)
    accuracies.append(accuracy)

# Plot the accuracy scores of all models
plt.plot(epoch_numbers, accuracies)
plt.xlabel("Model Name")
plt.ylabel("Accuracy (%)")
plt.show()