In [None]:
!pip install torchmetrics

Collecting torchmetrics
  Downloading torchmetrics-1.6.0-py3-none-any.whl.metadata (20 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.11.9-py3-none-any.whl.metadata (5.2 kB)
Downloading torchmetrics-1.6.0-py3-none-any.whl (926 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m926.4/926.4 kB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.11.9-py3-none-any.whl (28 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.11.9 torchmetrics-1.6.0


In [None]:
import io
import ast
import torch
import pickle
import numpy as np
from PIL import Image

import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision import transforms
from torch.utils.data import random_split, DataLoader, TensorDataset

from torchmetrics.classification import MultilabelPrecision, MultilabelRecall, MultilabelF1Score

In [None]:
test_data_pickle = "test_preprocessed_data.pkl"
model_path = "new_best_model.pt"
batch_size = 32

In [None]:
with open(test_data_pickle, 'rb') as f:
  data = pickle.load(f)

  # Print the type and the first few entries (if applicable)
  print(f"Type of data: {type(data)}")

  if isinstance(data, dict):
      print(f"Number of keys: {len(data)}")
      print("Sample keys:", list(data.keys())[:5])
      print("Sample value of the first key:", data[list(data.keys())[0]])

Type of data: <class 'dict'>
Number of keys: 1249
Sample keys: ['00025761_004.png', '00009689_023.png', '00020398_032.png', '00008594_002.png', '00009889_028.png']
Sample value of the first key: {'image_data': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.\' ",#\x1c\x1c(7),01444\x1f\'9=82<.342\xff\xc0\x00\x0b\x08\x00\xe0\x00\xe0\x01\x01\x11\x00\xff\xc4\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x10\x00\x02\x01\x03\x03\x02\x04\x03\x05\x05\x04\x04\x00\x00\x01}\x01\x02\x03\x00\x04\x11\x05\x12!1A\x06\x13Qa\x07"q\x142\x81\x91\xa1\x08#B\xb1\xc1\x15R\xd1\xf0$3br\x82\t\n\x16\x17\x18\x19\x1a%&\'()*456789:CDEFGHIJSTUVWXYZcdefghijstuvwxyz\x83\x84\x85\x86\x87\x88\x89\x8a\x92\x93\x94\x95\x96\x97\x98\x99\x9a\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9

In [None]:
def load_test_data(original_data_pickle, batch_size, target_size=(224, 224)):

  images = []
  demographics = []
  labels= []

  resize_transform = transforms.Compose([
      transforms.Resize(target_size),
      transforms.ToTensor()
  ])

  with open(original_data_pickle, 'rb') as f:
      data = pickle.load(f)

  for item in data.values():

    """
    The image data we get would be in bytes. We need to open it and convert it to grey scale and then resize. Recheck it. What are we doing with resizing before then?
    """
    image_data = item['image_data']
    image = Image.open(io.BytesIO(image_data)).convert('L')
    image = resize_transform(image)  # Resizing and converting to tensor with shape (1, H, W) --> got an error without it

    label= item['image_label']
    label = ast.literal_eval(label)
    label = np.array(label, dtype=int)

    #considering test preprocessing would come from the actual preprocessing pipeline, I'm not doing the age and gender transformation here

    age = torch.tensor([item['age']], dtype=torch.float32)
    gender = torch.tensor(item['gender'], dtype=torch.float32)

    images.append(image)
    demographics.append(torch.cat([age, gender]))
    labels.append(label)

  """
  Stacking images and demographics.
  images Shape: (num_samples, channels, height, width)
  demographics Shape: (num_samples, num_features)
  """
  images = torch.stack(images)
  demographics = torch.stack(demographics)
  labels = torch.stack([torch.tensor(label, dtype=torch.long) for label in labels])
  #labels = torch.tensor(labels, dtype= torch.long)

  test_dataset = TensorDataset(images, demographics, labels)
  test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

  print(f" samples: {len(test_dataset)}")

  return test_loader

In [None]:
class CustomResNet18(nn.Module):
    def __init__(self, demographic_fc_size, num_demographics, num_classes=15):
        super(CustomResNet18, self).__init__()

        self.resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

        # Modifying the first convolutional layer to accept grayscale images (1 channel) --> generally ResNet expects 3 channels
        #for RGB
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # Removing the final fully connected layer in ResNet
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])

        # this fc processes the demographics (age + gender)
        self.demographics_fc = nn.Sequential(
            nn.Linear(num_demographics, demographic_fc_size),
            nn.ReLU(),
            nn.Dropout(0.5)
        )

        self.fc = nn.Linear(512 + demographic_fc_size, num_classes)  # 512 from ResNet(it's how resnet is), 32 from demographics_fc, can make it 64?

    def forward(self, images, demographics):
        x = self.resnet(images)  # Passing images through the modified ResNet (without its last layer)
        x = x.view(x.size(0), -1)  # Flattening the ResNet output

        demographics_features = self.demographics_fc(demographics)
        x = torch.cat((x, demographics_features), dim=1)

        #print("Shape after concatenating demographics:", x.shape)

        x = self.fc(x)
        #print("Output shape before returning:", x.shape)

        return x

In [None]:
def evaluate_model(test_loader, model, criterion, precision_metric, recall_metric, f1_metric, confidence= 0.3):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.eval()
    correct = 0
    total = 0
    test_loss = 0.0

    precision_metric.reset()
    recall_metric.reset()
    f1_metric.reset()

    with torch.no_grad():
      for inputs, demographics, labels in test_loader:
        inputs, demographics, labels = inputs.to(device), demographics.to(device), labels.to(device)
        outputs = model(inputs, demographics)

        test_loss += criterion(outputs, labels.float()).item()

        probabilities = torch.sigmoid(outputs)
        predicted = (probabilities >= confidence).int()

        correct += (predicted == labels).sum().item()
        total += labels.numel()

        #print("predicted:", predicted)
        #print("labels: ", labels)

        precision_metric.update(predicted, labels)
        recall_metric.update(predicted, labels)
        f1_metric.update(predicted, labels)

    test_accuracy = 100 * correct / total
    precision = precision_metric.compute().item()
    recall = recall_metric.compute().item()
    f1_score = f1_metric.compute().item()
    avg_test_loss = test_loss / len(test_loader)


    print(f'Test Loss: {avg_test_loss:.4f}')
    print(f'Test Accuracy: {test_accuracy:.2f}%')
    print(f'Test Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1_score:.4f}')

    return test_accuracy, precision, recall, f1_score

In [None]:
def main(model_path: str, test_data_pickle: str, batch_size: int, num_classes: int, device):

  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

  print("Loading the best model for evaluation...")

  """
  If the model was saved like this: torch.save(model.state_dict(), "model_state_dict.pt")
  We have to use this:
  model = CustomResNet18(
    demographics_fc_size,
    num_demographics=config["num_demographics"],
    num_classes=config["num_classes"]
  )
  state_dict = torch.load(model_path, map_location=device)
  model.load_state_dict(state_dict)  # Load weights into the model architecture
  model.to(device)
  model.eval()

  -----------------------------------------------------
  I was using this before when I was saving model in .pth format. ".pth" is used for checkpointing immediately and ".pt"
  represents final model. Pytorch treats them the same nonetheless.

  model = CustomResNet18(demographics_fc_size,
                           num_demographics=config["num_demographics"],
                           num_classes=config["num_classes"])

  model.load_state_dict(torch.load(model_path, map_location=device))
  model.to(device)
  model.eval()

  loaded_object = torch.load(model_path, map_location=device)
  print(type(loaded_object))  # Check the type of the loaded object

  If it prints class '__main__.CustomResNet18', the entire model was saved.
  If it prints dict, a state_dict was saved.

  """

  """
  Since the model is saved with torch.save(model, "model.pt"), we have to use the following code to load the model.
  This loads the entire model's weights and architecture bypassing the state dict.
  """
  model = torch.load(model_path, map_location=device)
  model.to(device)
  model.eval()


  test_loader = load_test_data(test_data_pickle, batch_size)

  precision_metric = MultilabelPrecision(num_labels= num_classes, average='macro').to(device)
  recall_metric = MultilabelRecall(num_labels= num_classes, average='macro').to(device)
  f1_metric = MultilabelF1Score(num_labels= num_classes, average='macro').to(device)

  criterion = nn.BCEWithLogitsLoss()
  test_accuracy, precision, recall, f1_score= evaluate_model(test_loader, model, criterion, precision_metric, recall_metric, f1_metric)

  print(f"Test Accuracy of the best model: {test_accuracy:.4f}")

  return test_accuracy

Loading the best model for evaluation...


  model = torch.load(model_path, map_location=device)


 samples: 1249
Test Loss: 0.3347
Test Accuracy: 85.91%
Test Precision: 0.2640, Recall: 0.1678, F1-score: 0.1803
Test Accuracy of the best model: 85.9087
