In [None]:
! pip install bs4 lxml kaggle

In [1]:
import os
os.environ['KAGGLE_USERNAME'] = 'bilalyousaf0014'
os.environ['KAGGLE_KEY'] = '11031bc21c5e3ec23585dbe17dc4267d'

In [None]:
!kaggle datasets download -d bilalyousaf0014/ml-engineer-assessment-dataset

In [None]:
! unzip /content/ml-engineer-assessment-dataset.zip

In [2]:
import os
import torch
import torch.nn as nn
import numpy as np

from torchvision.models import resnet18, ResNet18_Weights

Custom Model:

In [3]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        #pretrained_model = resnet18(pretrained=True)
        pretrained_model = resnet18(weights=ResNet18_Weights.DEFAULT)
        self.backbone = nn.Sequential(*list(pretrained_model.children())[:-2])

        backbone_output_size = 512 * 7 * 7

        # Initialize the required Layers
        self.have_object = nn.Linear(backbone_output_size, 1)
        self.bbox = nn.Linear(backbone_output_size, 4)
        self.cat_or_dog = nn.Linear(backbone_output_size, 1)
        self.specie = nn.Linear(backbone_output_size, 9)

#intialization for the activation functions
    def forward(self, input):
        out_backbone = self.backbone(input)
        out_backbone = torch.flatten(out_backbone, start_dim=1)

        
        have_object = torch.sigmoid(self.have_object(out_backbone))
        bbox = self.bbox(out_backbone) #does not need any 

      
        cat_or_dog = torch.sigmoid(self.cat_or_dog(out_backbone))
        specie = torch.softmax(self.specie(out_backbone), dim=1) #multiclass 

        return {
            "bbox": bbox,
            "object": have_object,
            "cat_or_dog": cat_or_dog,
            "specie": specie
        }

CUSTOM DATALOADER IMPLEMENTATION

In [4]:
train_list = np.load('/content/assessment_dataset/train_list.npy', allow_pickle=True).tolist()
val_list = np.load('/content/assessment_dataset/val_list.npy', allow_pickle=True).tolist()

I have tweaked the xml reader a little bit as we needed have_object and dynamic width & heights

In [5]:
from bs4 import BeautifulSoup

def read_xml_file(path):
    with open(path, 'r') as f:
        data = f.read()
    bs_data = BeautifulSoup(data, 'xml')
    width = int(bs_data.find("width").text)
    height = int(bs_data.find("height").text)
    return {
        "have_object": bs_data.find("name") is not None,
        "cat_or_dog": bs_data.find("name").text,
        "xmin": int(bs_data.find("xmin").text),
        "ymin": int(bs_data.find("ymin").text),
        "xmax": int(bs_data.find("xmax").text),
        "ymax": int(bs_data.find("ymax").text),
        "specie": "_".join(path.split(os.sep)[-1].split("_")[:-1]),
        "width": width,
        "height": height
    }

The below data loader has many tweaks

1.   Needed a check for missing xmls otherwise it was giving errors
2.   Label dictionary values
3.   Pre-Processing for the raw image



In [6]:
from PIL import Image

import torchvision.transforms as transforms


class CustomDataset():

  def __init__(self, dataset_path, images_list, train=False):
    self.data_source = dataset_path
    self.images_list = images_list
    self.train = train

    image_folder_path = os.path.join(dataset_path, "images")
    label_folder_path = os.path.join(dataset_path, "labels")

    # Define preprocessing transform
    self.preprocess = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    self.samples = []
    for path in os.listdir(image_folder_path):
        name = path.split(os.sep)[-1].split(".")[0]
        if name in images_list:
          try:
            xml_path = os.path.join(label_folder_path, name+".xml")
            xml_data = read_xml_file(xml_path)
            
          except FileNotFoundError:
            #print(f"No XML file found for {name}. Skipping...")
            continue
          try:
            image_path = os.path.join(image_folder_path, name+".jpg")
            image = Image.open(image_path).convert("RGB")
            
          except FileNotFoundError:
            #print(f"No JPG file found for {name}. Skipping...")
            continue

          if self.preprocess is not None:
            image = self.preprocess(image)
          

          labels = {
              "have_object": xml_data["have_object"],
              "cat_or_dog": xml_data["cat_or_dog"],
              "have_object": xml_data["have_object"],
              "specie": xml_data["specie"],
              "bbox": [xml_data["xmin"], xml_data["ymin"], xml_data["xmax"], xml_data["ymax"]],
              "width": xml_data["width"],
              "height": xml_data["height"],
              "xmin": xml_data["xmin"],
              "ymin": xml_data["ymin"],
              "xmax": xml_data["xmax"],
              "ymax": xml_data["ymax"]

          }
          self.samples.append((image, labels))

  def __len__(self):
    return len(self.samples)

  def __getitem__(self, index):
    image, label = self.samples[index]
    return image, label

The below test function has also multiple additions:

1.   We needed built-in functions for converting the tensors to list, numpy and a NumPy array and then into a Python list for handling or serialization
2.   Then we needed pre-processing for each prediction output. For binary ones, we need to compare the thresholds of 0.5 and replace values greater than 0.5 to 1.
3. For multi-class, we needed to apply torch.argmax() along with second dimension (dim=1) to obtain the index of the class with the highest probability.



In [25]:
import torchmetrics
from sklearn.preprocessing import LabelEncoder

def test(model):

  def post_process_object(x):
    return torch.where(x > 0.5, 1.0, 0.0).squeeze(1) #replaces values greater than 0.5 with 1.0

  def post_process_cat_or_dog(x):
    return torch.where(x > 0.5, 1.0, 0.0).squeeze(1)

  def post_process_specie(x):
    return torch.argmax(x, dim=1)

  def post_process_bbox(x):
    return torch.argmax(x, dim=1) # index of the class with the highest probability

  def __tl(x):
    return x.tolist()

  def __tn(x):
    return x.detach().cpu().numpy()

  def __tnl(x):
    return (x.detach().cpu().numpy()).tolist()


  val_dataset = CustomDataset("/content/assessment_dataset", images_list=val_list)
  val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

  metric_object = torchmetrics.F1Score(task="binary") #F1 because it handles both precision & recall
  metric_cat_or_dog = torchmetrics.F1Score(task="binary")
  metric_specie = torchmetrics.F1Score(task="multiclass", num_classes=9) #F1 works best with multiclass as well
  metric_bbox = torchmetrics.F1Score(task="multiclass", num_classes=4)

  output_list = {
      "object": [],
      "cat_or_dog": [],
      "specie": [],
      "bbox": []
  }
  labels_list = {
      "object": [],
      "cat_or_dog": [],
      "specie": [],
      "bbox": []
  }
  for i, data in enumerate(val_loader):
    inputs, labels = data

    if torch.cuda.is_available():
      inputs = inputs.cuda()
      labels = {key: value.cuda() for key, value in labels.items()}

    # Make predictions for this batch
    outputs = model(inputs)

    is_object = __tnl(labels["have_object"])
    width = __tn(labels["width"])
    height = __tn(labels["height"])
    output_list["object"].extend(__tnl(post_process_object(outputs["object"])))
    labels_list["object"].extend(__tnl(labels["have_object"]))

    if is_object[0] == 1.0:

      label_encoder = LabelEncoder()
      encode_cat_or_dog = label_encoder.fit_transform(labels["cat_or_dog"])
      cat_or_dog_Target = torch.tensor(encode_cat_or_dog)

      output_list["cat_or_dog"].extend(
        __tnl(post_process_cat_or_dog(outputs["cat_or_dog"]))
      )
      labels_list["cat_or_dog"].extend(
        __tnl(cat_or_dog_Target)
      )
      
      label_encoder = LabelEncoder()
      encode_cat_or_dog = label_encoder.fit_transform(labels_list["cat_or_dog"])
      cat_or_dog_Target = torch.tensor(encode_cat_or_dog)

      label_encoder = LabelEncoder()
      encode_specie = label_encoder.fit_transform(labels["specie"])
      specie_Target = torch.tensor(encode_specie)

      output_list["specie"].extend(
        __tnl(post_process_specie(outputs["specie"]))
      )
      labels_list["specie"].extend(__tnl(specie_Target))
      
      label_encoder = LabelEncoder()
      encode_specie = label_encoder.fit_transform(labels_list["specie"])
      specie_Target = torch.tensor(encode_specie)

      '''labels_bbox = torch.stack(labels["bbox"]).float().transpose(0, 1)
      

      output_list["bbox"].extend(
        __tnl(post_process_bbox(outputs["bbox"]))
      )
      labels_list["bbox"].extend(__tnl(labels_bbox))

      labels_bbox = tuple(torch.tensor(bbox) for bbox in labels_list["bbox"])
      bbox_Target = torch.stack(labels_bbox).float().transpose(0, 1)'''


  score_object = metric_object(torch.tensor(output_list["object"]), torch.tensor(labels_list["object"]))
  score_cat_or_dog = metric_cat_or_dog(torch.tensor(output_list["cat_or_dog"]), cat_or_dog_Target)
  score_specie = metric_specie(torch.tensor(output_list["specie"]), specie_Target)
  #score_bbox = metric_bbox(torch.tensor(output_list["bbox"]), torch.tensor(bbox_Target))

  return score_object, score_cat_or_dog, score_specie #returning the score for each output

In [11]:
from torch.utils.data import DataLoader

In [40]:
import torch
from torch import nn
from torch.optim import Adam
from sklearn.preprocessing import LabelEncoder

def train(epochs, model_weights):

  # Initialize Model and Optimizer
  model = Model()
  optimizer = Adam(model.parameters())

  # Initialize Loss Functions
  have_object_loss = nn.BCELoss() #because it is a binary object
  specie_loss = nn.CrossEntropyLoss() # suitable for multi-class classification
  cat_or_dog_loss = nn.CrossEntropyLoss()
  bbox_loss = nn.MSELoss() # MSE perfect for bbox to measure difference between the predicted and target values
  xmin_loss = nn.MSELoss()
  ymin_loss = nn.MSELoss()
  xmax_loss = nn.MSELoss()
  ymax_loss = nn.MSELoss()

  #resume training initialize the best loss with a high value to track best loss achieved so far
  # after each epoch, the code will check if the current epoch loss is lower than best_loss
  #also, the code saves the model weights only when a new best loss is achieved during training.
  if model_weights is not None:
        model.load_state_dict(torch.load(model_weights))
        best_loss = float('inf') 
  else:
    best_loss = None 

  training_dataset = CustomDataset("/content/assessment_dataset", images_list=train_list)
  training_loader = training_loader = DataLoader(training_dataset, batch_size=64, shuffle=True)
  
  if torch.cuda.is_available():
    model = model.cuda()

  def train_one_epoch(epoch_index, tb_writer):
      running_loss = 0.
      last_loss = 0.

      # Here, we use enumerate(training_loader) instead of
      # iter(training_loader) so that we can track the batch
      # index and do some intra-epoch reporting
      for i, data in enumerate(training_loader):
          # Every data instance is an input + label pair
          inputs, labels = data

          if torch.cuda.is_available():
                inputs = inputs.cuda()
                labels = {key: value.cuda() for key, value in labels.items()}

          optimizer.zero_grad()

          # Make predictions for this batch
          outputs = model(inputs)
         

          # Compute the loss and its gradients
          #Have object
          target_haveobject = labels["have_object"].unsqueeze(1).float()
          loss_have_object = have_object_loss(outputs["object"].float(), target_haveobject.float())
          
          #specie loss
          label_encoder = LabelEncoder() #strings cannot be directly converted to tensors so we need to encode
          encoded_specie = label_encoder.fit_transform(labels["specie"])
          specie_target = torch.tensor(encoded_specie)
          
          loss_specie = specie_loss(outputs["specie"], specie_target)
          
          #cat or dog loss 
          label_encoder = LabelEncoder()
          encode_cat_or_dog = label_encoder.fit_transform(labels["cat_or_dog"])
          cat_or_dog_Target = torch.tensor(encode_cat_or_dog)

          cat_or_dog_Target = cat_or_dog_Target.unsqueeze(1).float()

          labels_bbox = torch.stack(labels["bbox"]).float().transpose(0, 1) #converting the labels so that it is same as outpit
          loss_bbox = bbox_loss(outputs["bbox"], labels_bbox) #loss bbox 

          loss_cat_or_dog = cat_or_dog_loss(outputs["cat_or_dog"], cat_or_dog_Target.float())
          
          #loss of coordinates
          #we need them all in float also we need the dimensions same as output for labels 
          X_min_target = torch.stack(labels["bbox"])[:, 0].repeat(inputs.size(0)).float()[:outputs["bbox"].shape[0]]
          #print("X_min_target shape:", X_min_target)
          #print("outputs[\"bbox\"][:, 0] shape:", outputs["bbox"][:, 0])
          loss_xmin = xmin_loss(outputs["bbox"][:, 0], X_min_target)


          y_min_target = torch.stack(labels["bbox"])[:, 1].repeat(inputs.size(0)).float()[:outputs["bbox"].shape[0]]
          #print("y_min_target shape:", y_min_target)
          #print("outputs[\"bbox\"][:, 1] shape:", outputs["bbox"][:, 1])
          loss_ymin = ymin_loss(outputs["bbox"][:, 1], y_min_target)

          x_max_target = torch.stack(labels["bbox"])[:, 2].repeat(inputs.size(0)).float()[:outputs["bbox"].shape[0]]
          #print("x_max_target shape:", x_max_target)
          #print("outputs[\"bbox\"][:, 2] shape:", outputs["bbox"][:, 2])
          loss_xmax = xmax_loss(outputs["bbox"][:, 2], x_max_target)

          loss_ymax_target = torch.stack(labels["bbox"])[:, 3].repeat(inputs.size(0)).float()[:outputs["bbox"].shape[0]]
          #print("loss_ymax_target shape:", loss_ymax_target)
          #print("outputs[\"bbox\"][:, 3] shape:", outputs["bbox"][:, 3])
          loss_ymax = ymax_loss(outputs["bbox"][:, 3], loss_ymax_target)

          loss = (
                loss_have_object
                + loss_specie
                + loss_cat_or_dog
                + loss_bbox
                + loss_xmin
                + loss_ymin
                + loss_xmax
                + loss_ymax
            )
          loss.backward()
          optimizer.step()

          # Gather data and report
          running_loss += loss.item()
          if i % 10 == 0:
              last_loss = running_loss / 10 # loss per batch
              running_loss = 0.
      return last_loss

  for i in range(epochs):
    

    epoch_loss = train_one_epoch(i, None)
    print(f' Epoch {i} Loss : {epoch_loss}')
    #path = "/content/Model.pth"

    if best_loss is None or epoch_loss < best_loss: # checks if the current epoch loss is lower than best_loss
      best_loss = epoch_loss
      folder_path = '/content'
      file_name = 'Model'+str(i)+'.pth'

      # Combine the folder path and file name to create the complete file path
      file_path = os.path.join(folder_path, file_name)

      torch.save(model.state_dict(), file_path)

    metrics = test(model)
    print(metrics)

In [None]:
train(50,model_weights=None)

Below is the visualize function and it does the following:

*   There is an input path, output path, and weight path
*   The output function returns { "has_object": True, "cat_or_dog": "cat", "specie": "persian", "xmin": 10, "ymin": 10, "xmax": 10, "ymax": 10 }
* images have extension jpg or jpeg
* the output folder have images stored with bounding box drawn on them



In [54]:
import os
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path

def visualize_images(input_folder, weight_file, output_folder):
   
    os.makedirs(output_folder, exist_ok=True)

    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    
    model = Model()  
    model.load_state_dict(torch.load(weight_file))
    model.to(device)
    model.eval()

    # Define the image transformation
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    
    for filename in os.listdir(input_folder):
       
        image_path = os.path.join(input_folder, filename)
        file_name = os.path.splitext(os.path.basename(image_path))[0]

        image = Image.open(os.path.join(input_folder, filename))

        try:
          image = Image.open(os.path.join(input_folder, file_name+".jpg"))
        except:
          image = Image.open(os.path.join(input_folder, file_name+".jpeg"))
        if image is not None:
            image = image.convert("RGB")

            image_tensor = transform(image).unsqueeze(0).to(device)

            
            with torch.no_grad():
                output = model(image_tensor)

          
            bboxes = output['bbox']  
            specie = output['specie']
            cat_or_dog = output['cat_or_dog']
            have_object = output['object']

            

            
            #fig, ax = plt.subplots()
            #ax.imshow(image)
            draw = ImageDraw.Draw(image)
            for bbox in bboxes:
                xmin, ymin, xmax, ymax = bbox
                border_width = 100  #optional
                xmin = xmin - border_width
                ymin = ymin - border_width
                xmax = xmax + border_width
                ymax = ymax + border_width
                #rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, linewidth=2, edgecolor='r', facecolor='none')
                draw.rectangle([xmin, ymin, xmax, ymax], outline='red', width=2)
                draw.text((bbox[0], bbox[1] - 20),text=file_name, fill='red')
                #ax.add_patch(rect)
            #plt.axis('off')
            plt.imshow(image)
            plt.show()
            #plt.savefig(os.path.join(output_folder, filename))  # Save the image with bounding boxes
            #plt.show()  # Show the image with bounding boxes
        else:
          continue
            

    print("Visualization complete.")

In [None]:
Output = "/content/Output" #output folder
Images = "/content/assessment_dataset/images" #images folder
weight_file = "/content/Model14.pth" #weight file

visualize_images(Images,weight_file,Output)