In [None]:
import pathlib
import torch
import torchvision.models as models
import torch.nn as nn
import os

In [None]:
filepath = pathlib.Path(__file__).parent.resolve()
edited_components_dir = os.path.join(filepath, "..", "edited_components")

if not os.path.exists(edited_components_dir):
    os.makedirs(edited_components_dir)

# YOLO

For our object detection we are using YOLOv8. We are actually using 2 seperate models for person and face detectinon. To apply the unified backbone into the YOLOv8 model branches, we need to modify the YOLOv8 model to accept the backbone's output as an input. To do that, we need to remove the image input layers from the YOLOv8 model and replace it with the backbone's output. We will work on the backbone's output later, for now, we neeed simply need to remove the image input layers from the YOLOv8 model and add a small adapter layer to replace it.

In [28]:
from ultralytics import YOLO

In [None]:
filepath = pathlib.Path(__file__).parent.resolve()
model_path = os.path.join(filepath, "..", "component_models", "yolov8n.pt")
yolo_model = YOLO(model_path)

In [50]:
print(yolo_model)

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(48, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_s

Printing out YOLO shows that the first two layers are for augmenting the image to be processed. We can remove these two layers and use the rest of the network to process the image. 

In [31]:
class CustomYOLO(nn.Module):
    def __init__(self, yolo_model, backbone_channels=512):
        super(CustomYOLO, self).__init__()
        
        # Adapter to match YOLO's expected features (32 channels)
        self.adapter = nn.Conv2d(backbone_channels, 32, kernel_size=1)
        
        # Remove the first two layers of YOLO, which are for processing input
        self.yolo = nn.Sequential(*list(yolo_model.model.model)[2:])
        
    def forward(self, backbone_features):
        x = self.adapter(backbone_features)
        x = self.yolo(x)
        return x

In [None]:
model_path = 'component_models/yolov8n.pt'
yolo_model = YOLO(model_path)
person_detect_branch = CustomYOLO(yolo_model)
torch.save(person_detect_branch, os.path.join(filepath, "..", "edited_components", "custom_yolo.pth"))

In [None]:
model_path = 'component_models/yolov8n-face.pt'
yolo_face_model = YOLO(model_path)
face_detect_branch = CustomYOLO(yolo_face_model)
torch.save(face_detect_branch, os.path.join(filepath, "..", "edited_components", "custom_yolo_face.pth"))

# AdaFace

To recognize faces we are using AdaFace. This is a similar process to YOLO, where we need to remove the image input layers from the model and replace it with the backbone's output. To do this, we have copied the AdaFace model PyTorch code and modified it to accept the backbone's output as an input. We apply the same forward as the original code to keep the same functionality.

In [34]:
import net_adaface

AdaFace has stored the weights of the model as a state dict. Due to this, we need to load the weights of the model into the original model first and then transfer the weights to the modified model.

In [35]:
ada_face_model_path = 'component_models/adaface_ir50_ms1mv2.ckpt'
ada_face_model = net_adaface.build_model('ir_50')
statedict = torch.load(ada_face_model_path)['state_dict']
model_statedict = {key[6:]:val for key, val in statedict.items() if key.startswith('model.')}
ada_face_model.load_state_dict(model_statedict)

<All keys matched successfully>

In [36]:
class CustomAdaFace(nn.Module):
    def __init__(self, ada_face_model: net_adaface.Backbone, backbone_channels=512):
        super(CustomAdaFace, self).__init__()
        
        # Adapter to match YOLO's expected features (32 channels)
        self.adapter = nn.Conv2d(2048, 64, kernel_size=1)
        
        # Save the body and output
        self.body = ada_face_model.body
        self.output_layer = ada_face_model.output_layer

    def forward(self, backbone_features):
        
        x = self.adapter(backbone_features)

        for idx, module in enumerate(self.body):
            x = module(x)

        x = self.output_layer(x)
        norm = torch.norm(x, 2, 1, True)
        output = torch.div(x, norm)

        return output, norm

In [None]:
face_rec_branch = CustomAdaFace(ada_face_model)
torch.save(face_rec_branch, os.path.join(filepath, "..", "edited_components", "custom_ada_face.pth"))

# ViT Pose

ViT Pose is a model that detects human poses. The model is avalible on Hugging Face in the transformers library. This is very useful, but we need to take a slightly different approach to use it. Instead of copying over the model to a custom version, we instead create a new model that simply has the adaper layer followed by the ViT Pose model. We then edit the patch embedding layer to an identity matrix to simply skip the layer. This makes it a much simpler process than the previous two models.

In [38]:
from transformers import AutoProcessor, VitPoseForPoseEstimation

device = "cuda" if torch.cuda.is_available() else "cpu"

In [39]:
class CustomVitPose(nn.Module):
    def __init__(self, vit_pose_model: VitPoseForPoseEstimation, backbone_channels=2048):
        super(CustomVitPose, self).__init__()
        
        # Get the hidden size from the model's config
        hidden_size = vit_pose_model.backbone.config.hidden_size
        
        self.adapter = nn.Sequential(
            nn.Conv2d(backbone_channels, hidden_size, kernel_size=1),
            nn.LayerNorm([hidden_size]),
            nn.ReLU(),
            nn.Conv2d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.LayerNorm([hidden_size]),
        )
        
        self.vit_pose = vit_pose_model
        self.vit_pose.backbone.embeddings.patch_embeddings = nn.Identity()

    def forward(self, backbone_features):
        x = self.adapter(backbone_features)
        return self.vit_pose(x)


In [None]:
hf_vit_pose_model = VitPoseForPoseEstimation.from_pretrained("usyd-community/vitpose-base-simple", device_map=device)
pose_detect_branch = CustomVitPose(hf_vit_pose_model)
torch.save(pose_detect_branch, os.path.join(filepath, "..", "edited_components", "custom_vit_pose.pth"))

# Backbone

For our backbone we chose Resnet50. We chose Resnet50 because it is a well known and well tested backbone that has been used in many object detection models. This class copies the Resnet50 model format, exept for the last layer. Instead of a classifyer output, we have branches depending on the task. The task is passed into the forward funciton, and only the relevant branch's connection is returned. The full multitask branch is not processed at this stage, but due to the seperate architectures between YOLOv8, AdaFace, and ViT Pose, we need the backbone to have seperate outputs to properly provide valid inputs to each model.

As opposed to downloading the backbone via `download_models.ipynb`, we can instead take the simpler approach of using the `torchvision` library to download the model. This is done by calling `torchvision.models.resnet50(pretrained=True)`. This will download the model and load the pretrained weights. We can then modify the model to have the desired outputs.

In [41]:
resnet_model = models.resnet50(pretrained=True)



In [51]:
print(resnet_model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [52]:
from transformers import AutoImageProcessor, ResNetForImageClassification

model = ResNetForImageClassification.from_pretrained("microsoft/resnet-50")

In [53]:
print(model)

ResNetForImageClassification(
  (resnet): ResNetModel(
    (embedder): ResNetEmbeddings(
      (embedder): ResNetConvLayer(
        (convolution): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (normalization): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (activation): ReLU()
      )
      (pooler): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    )
    (encoder): ResNetEncoder(
      (stages): ModuleList(
        (0): ResNetStage(
          (layers): Sequential(
            (0): ResNetBottleNeckLayer(
              (shortcut): ResNetShortCut(
                (convolution): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
                (normalization): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
              )
              (layer): Sequential(
                (0): ResNetConvLayer(
                  (convolution): Conv2d(64

As stated above, we copy the structure of Resnet50, but modify the last layer to have inputs for the individual branches.

In [43]:
class MultiTaskResNetFeatureExtractor(nn.Module):
    def __init__(self, original_model):
        super(MultiTaskResNetFeatureExtractor, self).__init__()

        # Define the layers of the original model
        self.conv1 = original_model.conv1
        self.bn1 = original_model.bn1
        self.relu = original_model.relu
        self.maxpool = original_model.maxpool
        self.layer1 = original_model.layer1
        self.layer2 = original_model.layer2
        self.layer3 = original_model.layer3
        self.layer4 = original_model.layer4
        self.avgpool = original_model.avgpool
        # self.flatten = nn.Flatten()
        
        # # Define separate heads for different outputs, which have different sizes
        # self.yolo_adapter = nn.Conv2d(2048, 512, kernel_size=1)
        # self.ada_face_adapter = nn.Conv2d(2048, 64, kernel_size=1)
        # self.vit_pose_adapter = 0 # TODO Change the output size to match ViT Pose

    def forward(self, x, current_task):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)

        return x

    #     # Get outputs for each head
    #     return torch.where(current_task == 'person_detection' or current_task == 'face_detection',
    #         self.yolo_forward(x),
    #         torch.where(current_task == 'face_identification',
    #             self.ada_face_forward(x),
    #             self.vit_pose_forward(x)))
    
    # def yolo_forward(self, x):
    #     return self.yolo_head(x)
    
    # def ada_face_forward(self, x):
    #     return self.ada_face_head(x)
    
    # def vit_pose_forward(self, x):
    #     return self.vit_pose_head(x)


In [44]:
feature_extractor = MultiTaskResNetFeatureExtractor(resnet_model)
torch.save(feature_extractor, 'edited_components/resnet_feature_extractor.pth')

# Combined Model

Now that we have all of the branches and the backbone of the model, we can now combine them all into a single multi-task model.

In [None]:
class CombinedModel(nn.Module):
    def __init__(self, backbone, yolo_face, yolo_person, ada_face, vit_pose):
        super(CombinedModel, self).__init__()

        if any(m is None for m in [backbone, yolo_face, yolo_person, ada_face, vit_pose]):
            raise ValueError("All models must be provided")

        self.current_task = 'person_detection'
        self.backbone = backbone
        self.yolo_face = yolo_face
        self.yolo_person = yolo_person
        self.ada_face = ada_face
        self.vit_pose = vit_pose


    def set_task(self, task_name):
        supported_tasks = ['face_detection', 'person_detection', 'pose_estimation', 'face_identification']
        if task_name not in supported_tasks:
            raise ValueError(f"Task {task_name} not supported. Available tasks: {', '.join(supported_tasks)}")
        self.current_task = task_name
        
    def forward(self, x):
        # Get features from backbone for the task
        features = self.backbone(x, self.current_task)

        # Use the correct branch for the inputted task
        # torch.where is used here for efficiency
        return torch.where(self.current_task == 'pose_estimation',
            self.vit_pose(features),
            torch.where(self.current_task == 'person_detection',
                self.yolo_person(features),
                torch.where(self.current_task == 'face_detection',
                    self.yolo_face(features),
                    self.ada_face(features)
                )
            )
        )

In [None]:
combined_model = CombinedModel(feature_extractor, face_detect_branch, person_detect_branch, face_rec_branch, pose_detect_branch)
torch.save(combined_model, os.path.join(filepath, "..", "edited_components", "combined_model.pth"))