In [31]:
import copy
import itertools
import os
import time
from tempfile import TemporaryDirectory


# 3rd party modules
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import timm
import torch
import torchvision
from PIL import Image
from sklearn.model_selection import train_test_split
from torch import nn, optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import datasets, models, transforms
from torchvision.io import read_image
from torch.quantization import quantize_dynamic
from tqdm import tqdm

Step 1: Load the Trained Model


In [32]:
def initialize_model(output_classes=6):
    """
    Initializes the ResNet-50 model pre-trained on ImageNet (just convolutional layers) and implements the final fully connected layer.

    Parameters:
    - output_classes (int): The number of classes for the final output layer.

    Returns:
    - model (torch.nn.Module): The modified model based on the pre-trained Resnet-50 feature representation.
    """

    # Load a pre-trained ResNet-50 model
    model = torchvision.models.resnet50(weights='IMAGENET1K_V1')

    # Freeze all the parameters in the model to prevent them from being updated during training
    for param in model.parameters():
        param.requires_grad = False

    # Get the number of input features to the final fully connected layer
    in_features = model.fc.in_features # 2048 input features (from the last feature mapping layer of Resnet)

    # Replace the final fully connected layer with a new one that has the desired number of output classes
    model.fc = nn.Sequential(
        nn.Linear(in_features, 512),  # Reduce dimension from in_features to 512
        nn.ReLU(),                    # Apply ReLU activation function
        nn.Linear(512, 6)  # Final layer with 'output_classes' number of outputs
    )

    return model

def load_model(model_path, output_classes=6):
    # Initialize the model structure
    model = initialize_model(output_classes)
    

    # Load the trained model weights
    saved_contents = torch.load(model_path)
    state_dict = saved_contents["state_dict"]
    model.load_state_dict(state_dict)

    return model

model_path = 'RESNET50_best_model.pth'  # Adjust the path as needed
model = load_model(model_path)


Step 2: Apply Quantization


In [33]:
model.eval()  # Ensure the model is in evaluation mode before quantization

# Apply dynamic quantization
quantized_model = quantize_dynamic(
    model, 
    {nn.Linear},  # Specify the layer types to quantize
    dtype=torch.qint8  # Specify the quantization data type
)

# Move the quantized model to the appropriate device if you are using GPU for evaluation
device = torch.device("cpu")  # Quantized models are typically used on CPUs
quantized_model.to(device)


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

Step 3: Evaluate the Quantized Model


In [38]:
def prepare_data(data_dir):
    """
    Prepares the dataset for training/testing by organizing image paths and labels.

    Parameters:
    - data_dir (str): Directory containing the dataset, organized into subdirectories
      for each category.

    Returns:
    - tuple: Two lists containing the image file paths and corresponding labels.
    """

    # Define the categories of the dataset
    categories = ['Trash', 'Plastic', 'Paper', 'Metal', 'Glass', 'Cardboard']

    # Lists to hold the paths of the images and their labels
    image_paths = []
    labels = []  # Numerical labels: 0 for Trash, 1 for Plastic, etc.

    # Enumerate over categories to label the images accordingly
    for label, category in enumerate(categories):
            try:
                # Construct the directory path for the current category
                category_dir = os.path.join(data_dir, category)

                # Iterate through each file in the category directory
                for file in os.listdir(category_dir):
                    # Check if the file is an image
                    if file.endswith('.jpg') or file.endswith('.png'):
                        image_paths.append(os.path.join(category_dir, file))
                        labels.append(label)  # Assign the label to this image
            except Exception as e:
                print(f"Failed to process category {category} due to error: {e}")
                continue
    return image_paths, labels


class CustomDataset(Dataset):
    """
    A custom dataset class that extends PyTorch's Dataset class for image loading and preprocessing.

    Attributes:
    - image_paths (list): List of paths to the images.
    - labels (list): List of labels corresponding to the images.
    - transform (callable, optional): Optional transform to be applied on a sample.
    """

    def __init__(self, image_paths, labels, transform=None):
        """
        Initializes the dataset with images and labels.

        Parameters:
        - image_paths (list): List of paths to the images.
        - labels (list): List of labels for the images.
        - transform (callable, optional): Optional transform to apply on images.
        """

        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        """
        Returns the total number of samples in the dataset.
        """

        return len(self.image_paths)

    def __getitem__(self, index):
        """
        Retrieves an image and its label from the dataset at the specified index.

        Parameters:
        - index (int): Index of the image and label to return.

        Returns:
        - tuple: A tuple containing the image and its label.
        """

        image_path = self.image_paths[index]
    
        # Print the image path for debugging
        # print(f"Accessing image: {image_path}")

        # Check if the file exists
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"The file {image_path} does not exist.")

        # Attempt to open the image file
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            # If an error occurs, print the error and the problematic file path
            print(f"Error opening image at {image_path}: {e}")
            raise

        if self.transform:
            image = self.transform(image)

        label = torch.tensor(self.labels[index], dtype=torch.long)
        return image, label

def evaluate_model(model, dataloader, loss_fun, device):
    """
    Evaluates the model's performance on a given dataset.

    Parameters:
    - model: The neural network model to evaluate.
    - dataloader: The DataLoader containing the dataset for evaluation.
    - loss_fun: The loss function used to compute the model's loss.
    - device: The device (CPU or CUDA) on which the computations will be performed.

    Returns:
    - A tuple of average loss and accuracy over the dataset.
    """

    model.eval()  # Set the model to evaluation mode    test_loss = 0.0
    val_acc = 0.0
    val_loss = 0.0

    with torch.no_grad():  # No gradients needed for evaluation
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)  # Move data to the same device as the model

            pred = model(x)
            _, predicted_classes = torch.max(pred, 1)
            correct_predictions = (predicted_classes == y).float()

            loss = loss_fun(pred, y.long())  # Ensure consistent data type
            val_loss += loss.item()
            val_acc += correct_predictions.sum().item() / y.size(0)

        val_loss /= len(dataloader)
        val_acc /= len(dataloader)

    return val_loss, val_acc

In [39]:
### TEMPORARY
folder_path = '../../data/dataset-resized' # My local path
image_paths, labels = prepare_data(folder_path)
batch_size = 32
# Split the data into training and test sets
train_paths, test_paths, train_labels, test_labels = train_test_split(
    image_paths, labels, test_size=0.15, random_state=0)

# Split the data into training, validation, and test sets
train_paths, val_paths, train_labels, val_labels = train_test_split(
    train_paths, train_labels, test_size=0.175, random_state=0)  # 0.175 x 0.85 ~= 0.15

# Normalization parameters found to be optimal for CIFAR dataset; may need adjustment for other datasets
normalization = ((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))

# Define the training data transformations
training_transform = transforms.Compose([
    transforms.Resize(256, antialias=True),  # Resize images to 256x256, with antialiasing
    transforms.RandomCrop(224),  # Crop randomly to 224x224 for data augmentation
    transforms.RandomRotation(20),  # Rotate images up to 20 degrees for data augmentation
    transforms.RandomHorizontalFlip(0.1),  # Horizontally flip images with a probability of 0.1 for data augmentation
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),  # Randomly adjust color settings
    transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.1),  # Randomly adjust sharpness for data augmentation
    transforms.ToTensor(),  # Convert images to tensors
    transforms.Normalize(*normalization),  # Normalize images using the specified mean and std dev
])

# Define the validation and test data transformations (simpler than training transformations)
transform = transforms.Compose([
    transforms.Resize(256, antialias=True),  # Resize images to 256x256, with antialiasing
    transforms.CenterCrop(224),  # Center crop images to 224x224
    transforms.ToTensor(),  # Convert images to tensors
    transforms.Normalize(*normalization),  # Normalize images using the specified mean and std dev
])

# Create dataset objects for training, validation, and testing
train_dataset = CustomDataset(train_paths, train_labels, transform=training_transform)
val_dataset = CustomDataset(val_paths, val_labels, transform=transform)
test_dataset = CustomDataset(test_paths, test_labels, transform=transform)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [40]:
# Assuming you have a function to create your dataloaders
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Evaluate the model
loss_fun = nn.CrossEntropyLoss()
test_loss, test_acc = evaluate_model(quantized_model, test_dataloader, loss_fun, device)

print(f"Quantized Model Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")


Quantized Model Test Loss: 0.5621, Test Accuracy: 0.8341
