# **Part III - Final Stage**
### **Load libraries** 

In [1]:

# https://www.datatechnotes.com/2024/10/how-to-use-vgg-model-with-pytorch.html
import torch
import torch.nn as nn
from torchvision import models, transforms
from torchvision.transforms import functional

import numpy as np

from PIL import Image
from part1_BoundingBoxExtraction import extractImagesPath
import os

# Retrieve the main path
dir_path = os.path.dirname(os.path.realpath(__name__))

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


### **Load pictures path dataset**

In [2]:
# Extract picture path without preprocessing
rawPicturesPathDS:list[list[str],list[str]] = extractImagesPath()
rawPicturesPathDS[1] = list(map(lambda x: x.removesuffix(".jpg").strip("0123456789_"), rawPicturesPathDS[1])) 

# Extract picture path with YOLO preprocessing
workingPicturesPathDS:list[list[str],list[str]] = [[],[]]
paths = os.listdir(os.path.join(dir_path, "working"))
for picturePath in paths:
    workingPicturesPathDS[0].append(os.path.join(dir_path, "working", picturePath))
    workingPicturesPathDS[1].append(picturePath.removesuffix(".jpg").split("-")[0].strip("0123456789_"))
    
# Extract picture path with YOLO plus RetinaFace preprocessing
workingFacePicturesPathDS:list[list[str],list[str]] = [[],[]]
paths = os.listdir(os.path.join(dir_path, "working_faces"))
for picturePath in paths:
    workingFacePicturesPathDS[0].append(os.path.join(dir_path, "working_faces", picturePath))
    workingFacePicturesPathDS[1].append(picturePath.removesuffix(".jpg").split("-")[0].strip("0123456789_"))

### **Load pictures from the datasets**

In [3]:
# Unprocessed dataset
resize = transforms.Resize((224, 224))

rawPicture = [[], rawPicturesPathDS[1]]
for picture in rawPicturesPathDS[0]:
    img = Image.open(picture)
    
    # Convert the picture into tensor of float with normalised values between 0 and 1
    rawPicture[0].append(resize(functional.pil_to_tensor(img).float()/255.))

print("Raw pictures loaded")

# Partially preprocessed dataset
workingPicture = [[], workingPicturesPathDS[1]]
for picture in workingPicturesPathDS[0]:
    img = Image.open(picture)
    workingPicture[0].append(resize(functional.pil_to_tensor(img).float()/255.))

print("YOLO pictures loaded")

# Fully preprocessed dataset
workingFacePicture = [[], workingFacePicturesPathDS[1]]
for picture in workingFacePicturesPathDS[0]:
    img = Image.open(picture)
    workingFacePicture[0].append(resize(functional.pil_to_tensor(img).float()/255.))

print("YOLO and Retina pictures loaded")

Raw pictures loaded
YOLO pictures loaded
YOLO and Retina pictures loaded


#### **Get the pretrained model structure**

In [4]:
models.vgg16(pretrained=True).eval()



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

#### **Recreate VGG structure**


In [5]:
# https://www.digitalocean.com/community/tutorials/vgg-from-scratch-pytorch

class VGG16(nn.Module):
    def __init__(self, num_classes:int):
        super(VGG16, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(in_features=25088, out_features=4096, bias=True),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=4096, bias=True),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=num_classes, bias=True)
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

classes = list(set(rawPicturesPathDS[1]))
num_classes = len(classes)

# replace classes name with numbers
rawPicture.append([classes.index(actor) for actor in rawPicture[1]]) 
workingPicture.append([classes.index(actor) for actor in workingPicture[1]]) 
workingFacePicture.append([classes.index(actor) for actor in workingFacePicture[1]]) 

VGG is way too slow to run on pur machines. In consequences, we used a smaller model

In [19]:
class PseudoVGG(nn.Module):
    def __init__(self, num_classes:int):
        super(PseudoVGG, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(128 * 7 * 7, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

#### **Create training and test set**

To conserve the proportion of pictures among the classes, we will keep 25% of the pictures for tests among ALL the classes. If we take 25% randomly, we might have some actors only in the test set but not within the training set.


In [6]:
# Initiate the random number generator
seed = 0
rng = np.random.RandomState(seed)

# Set the train proportion
trainProportion = 0.75

def findIndices(i:int, pictureList:list) -> list[int]:
    return [actorIndex for actorIndex, actor in enumerate(pictureList) if actor==i]

# Build the training and test set for each
rawPictureTrainingSet = [[], []]
rawPictureTestSet = [[], []]

workingPictureTrainingSet = [[], []]
workingPictureTestSet = [[], []]

workingFacePictureTrainingSet = [[], []]
workingFacePictureTestSet = [[], []]

# Loop through all actor
for actorClass in range(num_classes):
    
    # Find all the indices for an actor in the dataset
    tmp = findIndices(actorClass, rawPicture[2])
    
    # Get random indices out of them
    tmp = set(rng.choice(tmp, int(trainProportion*len(tmp)), replace=False))
    
    # Extend the training set with the picture first, then the corresponding actor
    rawPictureTrainingSet[0].extend([rawPicture[0][i] for i in tmp])
    rawPictureTrainingSet[1].extend([rawPicture[2][i] for i in tmp])
    
    # Extend the test set with the picture first, then the corresponding actor
    rawPictureTestSet[0].extend([rawPicture[0][i] for i in range(len(rawPicture[0])) if i not in tmp])
    rawPictureTestSet[0].extend([rawPicture[2][i] for i in range(len(rawPicture[2])) if i not in tmp])
    
    # Do the same for the others
    # YOLO
    tmp = findIndices(actorClass, workingPicture[2])
    tmp = set(rng.choice(tmp, int(trainProportion*len(tmp)), replace=False))
    workingPictureTrainingSet[0].extend([workingPicture[0][i] for i in tmp])
    workingPictureTrainingSet[1].extend([workingPicture[2][i] for i in tmp])
    workingPictureTestSet[0].extend([workingPicture[0][i] for i in range(len(workingPicture[0])) if i not in tmp])
    workingPictureTestSet[0].extend([workingPicture[2][i] for i in range(len(workingPicture[2])) if i not in tmp])
    
    # YOLO plus retina
    tmp = findIndices(actorClass, workingFacePicture[2])
    tmp = set(rng.choice(tmp, int(trainProportion*len(tmp)), replace=False))
    workingFacePictureTrainingSet[0].extend([workingFacePicture[0][i] for i in tmp])
    workingFacePictureTrainingSet[1].extend([workingFacePicture[2][i] for i in tmp])
    workingFacePictureTestSet[0].extend([workingFacePicture[0][i] for i in range(len(workingFacePicture[0])) if i not in tmp])
    workingFacePictureTestSet[0].extend([workingFacePicture[2][i] for i in range(len(workingFacePicture[2])) if i not in tmp])

#### **Train the model on YOLO + Retina**


In [21]:
model = PseudoVGG(num_classes).to(device)
num_epochs = 20
learning_rate = 0.005
N = len(workingFacePictureTrainingSet[0])

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.005, momentum = 0.9)  

batch_size = 8
num_epochs = 20
N = len(workingFacePictureTrainingSet[0])

for epoch in range(num_epochs):
    model.train()
    perm = torch.randperm(N)

    running_loss = 0.0

    for i in range(0, N, batch_size):
        idx = perm[i:i+batch_size].tolist()

        images = torch.stack([workingFacePictureTrainingSet[0][j] for j in idx])
        labels = torch.tensor([workingFacePictureTrainingSet[1][j] for j in idx])

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * labels.size(0)

    print("ok")
    # ===== Validation (on training set) =====
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for i in range(0, N, batch_size):
            images = torch.stack(workingFacePictureTrainingSet[0][i:i+batch_size])
            labels = torch.tensor(workingFacePictureTrainingSet[1][i:i+batch_size])

            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    acc = 100 * correct / total
    loss_avg = running_loss / total

    print(
        f"Epoch [{epoch+1}/{num_epochs}] "
        f"Loss: {loss_avg:.4f} "
        f"Train Acc: {acc:.2f}%"
    )


KeyboardInterrupt: 