# NDT Global - Technical Assessment Machine Learning Engineer

In [2]:
import os
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import random
import matplotlib.pyplot as plt
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image

#import tensorflow as tf
#from tensorflow.keras import layers, models
#from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt


In [3]:
# Check if GPU is available (Apple's Metal backend)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


Num GPUs Available:  1


Define a custom dataloader optimsed for data augmentation with transforamtions

In [None]:
##################################################################
# CustomImageDataset() :: Custom dataset class to load images    #
##################################################################
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        # Assume folder structure is img_dir/class_x/img1.jpg
        for label, class_dir in enumerate(os.listdir(img_dir)):
            class_folder = os.path.join(img_dir, class_dir)
            if os.path.isdir(class_folder):
                for img_file in os.listdir(class_folder):
                    self.image_paths.append(os.path.join(class_folder, img_file))
                    self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label



In [None]:
##########################################################################################
# split_data() ::  Function to split data into training, validation and test sets        #
##########################################################################################
def split_data(dataset, test_split=0.1, val_split=0.1, shuffle=True, seed=37):
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    
    # Calculate split sizes
    test_size = int(np.floor(test_split * dataset_size))
    val_size = int(np.floor(val_split * (dataset_size - test_size)))  # Validation based on remaining data
    
    if shuffle:
        random.seed(seed)
        random.shuffle(indices)
    
    test_indices = indices[:test_size]
    remaining_indices = indices[test_size:]
    
    val_indices = remaining_indices[:val_size]
    train_indices = remaining_indices[val_size:]
    
    return train_indices, val_indices, test_indices

Function to train a model

In [None]:
#####################################################################################################################
#  train_model() :: Training function                                                                               #
#      patience   :: Training will stop if no significant improvement takes place in "patience" epochs.             #
#      threshold  :: Relative improvement required for validation acc or loss to reset the patience counter.        #
#####################################################################################################################
def train_model(model, train_loader, valid_loader, criterion, optimizer, num_epochs, scheduler, patience=35, threshold=0.02, save_path="./Model/Simple-CNN/", save_name ="model.pth"):
    
    # Keep track of training and validation loss
    valid_accuracy_max = 0                       # Itinizalize max validation accuracy
    valid_loss_min = np.Inf                      # Initialize minimum validation loss
    train_losses, valid_losses = [], []          # Store loss values for plotting
    train_accuracies, valid_accuracies = [], []  # Store accuracy values for plotting
    no_improvement_epochs = 0                    # Initialize patience counter

    for epoch in range(1, num_epochs+1):

        ###################
        # train the model # --> train_loader
        ###################
        model.train()  # Set the model to training mode
        train_loss = 0.0
        valid_loss = 0.0
        correct_train = 0
        total_train = 0        
        
        # Training phase
        for images, labels in train_loader:
            # move tensors to GPU if CUDA is available
            if train_on_gpu:
                images, labels = images.cuda(), labels.cuda()

            optimizer.zero_grad()               # Clear the gradients
            output = model(images)              # Forward pass, calculate ouput
            loss = criterion(output, labels)    # Calculate the loss
            loss.backward()                     # Backpropagation
            optimizer.step()                    # Update weights

            train_loss += loss.item() * images.size(0)  # Track training loss
            
            _, predicted = torch.max(output, 1)  # Get predicted class
            correct_train += (predicted == labels).sum().item()
            total_train += labels.size(0)

        ######################
        # validate the model # --> valid_loader
        ######################
        model.eval()  # Set the model to evaluation mode
        correct_valid = 0
        total_valid = 0

        with torch.no_grad():  # Disable gradient calculation for validation
            for images, labels in valid_loader:
                if train_on_gpu:
                    images, labels = images.cuda(), labels.cuda()

                output = model(images)                      # Forward pass
                loss = criterion(output, labels)            # Calculate the validation loss
                
                valid_loss += loss.item() * images.size(0)   # Track validation loss

                _, predicted = torch.max(output, 1)         # Get predicted class
                correct_valid += (predicted == labels).sum().item()
                total_valid += labels.size(0)
        
        ###################
        #  Derive metrics #
        ###################
        
        # Calculate average losses
        train_loss = train_loss / len(train_loader.sampler)
        valid_loss = valid_loss / len(valid_loader.sampler)
        #train_loss = train_loss / len(train_loader)  
        #valid_loss = valid_loss / len(valid_loader)

        train_losses.append(train_loss)
        valid_losses.append(valid_loss)

        # Calculate training and validation accuracy
        train_accuracy = 100 * correct_train / total_train
        valid_accuracy = 100 * correct_valid / total_valid
        
        train_accuracies.append(train_accuracy)
        valid_accuracies.append(valid_accuracy)

        # Print training/validation statistics
        print(f'\nEpoch: {epoch} \n\tTraining Loss: {train_loss:.6f} \tValidation Loss: {valid_loss:.6f} \n\tTraining Accuracy: {train_accuracy:.2f}% \tValidation Accuracy: {valid_accuracy:.2f}%')

        #####
        # Save the model if validation loss has decreased
        ######
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        save_full_path = os.path.join(save_path, save_name)
        
        if valid_loss < valid_loss_min:  
            print(f'Validation loss decreased ({valid_loss_min:.6f} --> {valid_loss:.6f}). Saving model ...')
            torch.save(model.state_dict(), save_full_path)
            valid_loss_min = valid_loss
            #valid_accuracy_max = valid_accuracy
            
            
        ####
        # Early stop
        ### 
        relative_loss_improvement = (valid_loss_min - valid_loss) / valid_loss_min if valid_loss_min != 0 else 0
        relative_accuracy_improvement = (valid_accuracy - valid_accuracy_max) / valid_accuracy_max if valid_accuracy_max != 0 else 0


        # Check if loss and accuracy improved by more than the threshold
        if (relative_loss_improvement > threshold) or (relative_accuracy_improvement > threshold):
            valid_accuracy_max = valid_accuracy
            no_improvement_epochs = 0  # Reset patience counter 
        else:
            no_improvement_epochs += 1
        
        # Adjust learning rate based on validation loss
        scheduler.step(valid_loss)
        
        if no_improvement_epochs >= patience:
            print(f'Early stopping: No improvement in validation for {patience} epochs.')
            break  

    return train_losses, valid_losses, train_accuracies, valid_accuracies

Function to create model
Adjust for number of clases

In [None]:
###############################################
#  My_CNN() ::                              #
###############################################
class My_CNN(nn.Module):
    def __init__(self):
        super(My_CNN, self).__init__()
        
        # First Convolutional Layer
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)  # 3x3 kernel, padding to preserve size
        self.bn1 = nn.BatchNorm2d(32)  # Batch normalization after conv1
        
        # Second Convolutional Layer
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)  # Increase number of filters
        self.bn2 = nn.BatchNorm2d(64)
        
        # Third Convolutional Layer
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        
        # Fourth Convolutional Layer (Optional)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)

        # Global Average Pooling (instead of Flatten)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1)) # Creates a 1x1 image with many channels

        # Fully Connected Layers
        self.fc1 = nn.Linear(256, 128)  # Match input to output channels of the last conv layer
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)  # Final output for 10 classes

        # Dropout Layer
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        # First Convolutional Block
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, kernel_size=2, stride=2)  # Reduce size by half

        # Second Convolutional Block
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, kernel_size=2, stride=2)

        # Third Convolutional Block
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.max_pool2d(x, kernel_size=2, stride=2)

        # Fourth Convolutional Block (Optional)
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.max_pool2d(x, kernel_size=2, stride=2)

        # Global Average Pooling
        x = self.global_avg_pool(x)  # Reduces to (batch_size, 256, 1, 1) # 256 = number of channels
        x = x.view(x.size(0), -1)    # Flatten to (batch_size, 256)

        # Fully Connected Layers
        x = F.relu(self.fc1(x))
        x = self.dropout(x)  # Apply dropout for regularization
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)  # Output layer
        
        return x
    
    
model = My_CNN()
print(model)

In [None]:
############################################################################
# plot_images_from_loader() :: Used to test the impact of transformations 
############################################################################
def plot_images_from_loader(loader):
    # Get a batch of images and labels
    images, labels = next(iter(loader))
    
    # Set up a 2x3 subplot
    fig, axes = plt.subplots(2, 3, figsize=(10, 7))
    
    # Loop through the first 6 images
    for i, ax in enumerate(axes.flat):
        # Convert tensor image to numpy array and adjust for matplotlib
        img = images[i].numpy().transpose(1, 2, 0)
        
        # Plot the image
        ax.imshow(img)
        ax.set_title(f'Label: {labels[i].item()}')
        ax.axis('off')  # Hide axis

    plt.tight_layout()
    plt.show()
    
########################################################
# plot_accuracy() :: training and validation accuracy  #
########################################################
def plot_accuracy(train_accuracies, valid_accuracies):
    epochs = range(1, len(train_accuracies) + 1)  # defined like this for the event of early stopping
    plt.plot(epochs, train_accuracies, 'r', label='Training Accuracy')
    plt.plot(epochs, valid_accuracies, 'b', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.show()

################################################
# plot_loss() :: training and validation loss  #
################################################
def plot_loss(train_losses, valid_losses):
    epochs = range(1, len(train_losses) + 1)  
    plt.plot(epochs, train_losses, 'r', label='Training Loss')
    plt.plot(epochs, valid_losses, 'b', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
####################################################################################
# test_model() :: Evaluates the performance of the model on the given test_loader  #
####################################################################################
def test_model(model, test_loader, criterion):
    # Load the weights of the model to test
    #model.load_state_dict(torch.load('./Model/Simple-CNN/model.pth'))
    
    # Set the model to evaluation mode
    model.eval()
    
    test_loss = 0.0
    correct_test = 0
    total_test = 0
    
    # Turn off gradients for test evaluation
    with torch.no_grad():
        for images, labels in test_loader:
            # Move tensors to GPU if CUDA is available
            if train_on_gpu:
                images, labels = images.cuda(), labels.cuda()

            # Forward pass: compute predictions
            output = model(images)
            
            # Calculate the test loss
            loss = criterion(output, labels)
            test_loss += loss.item() * images.size(0)  # Accumulate the test loss
            
            # Calculate accuracy
            _, predicted = torch.max(output, 1)  # Get predicted class
            correct_test += (predicted == labels).sum().item()
            total_test += labels.size(0)
    
    # Calculate average test loss
    test_loss = test_loss / len(test_loader.sampler)
    
    # Calculate test accuracy
    test_accuracy = 100 * correct_test / total_test
    
    print(f'\nTest Loss: {test_loss:.6f} \nTest Accuracy: {test_accuracy:.2f}%')

In [None]:
Debug = True

## Data preparation

In [None]:
####################
#   Load the data  #
####################


# Data path
data_dir = 'path_to_train_data'


# Transformations
# Define transformations for train, validation and test datasets
train_transform = transforms.Compose([
    transforms.Resize((150, 150)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ToTensor(),
])

# No augmentations for validation and test datasets
no_transform = transforms.Compose([
    transforms.Resize((150, 150)),
    transforms.ToTensor(),
])


# Create the datasets
train_dataset = CustomImageDataset(data_dir, transform=train_transform)
val_dataset = CustomImageDataset(data_dir, transform=no_transform)
test_dataset = CustomImageDataset(data_dir, transform=no_transform)


# Split data with indices
train_indices, val_indices, test_indices = split_data(train_dataset, test_split=0.1, val_split=0.1)

# Create data samplers and loaders
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)


batch_size = 32 #reduce if memory issues. Try out batch sizes of 16, 32, and 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, sampler=val_sampler, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, sampler=test_sampler, num_workers=4)

# Confirm the splits
print(f"Number of training samples: {len(train_indices)}")
print(f"Number of validation samples: {len(val_indices)}")
print(f"Number of test samples: {len(test_indices)}")


In [None]:
# Test
plot_images_from_loader(train_loader)

## Model training

In [None]:
#########################
# Train hyperparameters
#########################

# Specify loss function (categorical cross-entropy)
criterion = nn.CrossEntropyLoss()

# Specify optimizer. The most popular optimizers are Adam and SGD
optimizer = optim.SGD(model.parameters(), lr=0.01)            # Stochastic Gradient Descent
#optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)  # Adaptive Moment Estimation

# Define the learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

# Number of epochs to train the model
n_epochs = 40


train_losses, valid_losses, train_accuracies, valid_accuracies = train_model(
    model, train_loader, val_loader,  
    criterion, optimizer, num_epochs=n_epochs, 
    scheduler=scheduler, patience=100, threshold=0.01, 
    save_path="./Model/Test_CNN/", save_name="model_01.pth"
)

## Visualizations

In [None]:
plot_accuracy(train_accuracies, valid_accuracies)

In [None]:
plot_loss(train_losses, valid_losses)

In [None]:
test_model(model, test_loader, criterion)
