In [1]:
import torch
import torch.nn as nn


from torch.optim import SGD
import torch.nn.functional as F


from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split

import torch.optim as optim


from torchinfo import summary

import numpy as np

In [2]:
# Set a random seed for reproducibility
seed = 42
torch.manual_seed(seed)

# If using GPU, also set the seed for GPU
torch.cuda.manual_seed_all(seed)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # sets device for model and PyTorch tensors

print(device)


cpu


In [3]:
class DepthwiseSeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super(DepthwiseSeparableConv2d, self).__init__()

        # Depthwise convolution - The number of groups is equivalent to the number of channels which makes the convolution be performed to each channel independently.
        self.depthwise_conv = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding, groups=in_channels, bias=False)

        # Pointwise convolution = i used a 1x1 kernel to combine  information accross channels and project the features to a new space.It transforms teh number of channesl from in_channel to out_channels 
        self.pointwise_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        #Input Channels: The number of input channels (in_channels) corresponds to the depth or the number of features at each spatial location.
        #Output Channels: The number of output channels (out_channels) corresponds to the number of filters or features that the convolutional layer is going to produce.
        # pointwise convolution (1x1) performs a linear combination of input channels at each spatial location, resulting in an output with a new set of channels. The weights for this linear combination are learned during the training process, providing the model with the flexibility to capture different relationships and patterns across channels


        #Normalizes the output of the pointwise convolution. Batch normalization helps stabilize and accelerate the training process by normalizing the activations
        self.bn = nn.BatchNorm2d(out_channels) #self.bn = nn.BatchNorm2d(out_channels)
        #choice of applying the BatchNorm2d fter the pointwise conv and before the PReLu to stabilize te inputs of the activation function

        #Applies the PReLU activation function to the batch-normalized output. PReLU introduces learnable parameters to the standard ReLU activation.
        self.relu = nn.PReLU()
        #PReLU introduces a learnable parameter, allowing the slope of the negative part of the activation to be adjusted during training.
        #mathematically is it equivalento to:
        #PReLU(x) -> x;x>=0
        #PReLU(x) -> alpha . x;x<0

    #defines the forward pass of the network, specifying how the input data is transformed through the layers of the network to produce the final output
    def forward(self, x):
        #Applies depthwise separable convolution operation, which consists of depthwise convolution, pointwise convolution, batch normalization, and activation. 
        out = self.relu(self.bn(self.pointwise_conv(self.depthwise_conv(x))))
        return out

#see InvertedResidualBlock , essentially the same explanations but without the connection to output
class BottleneckBlock(nn.Module):
    def __init__(self, in_channels, out_channels, expansion, stride=1):
        super(BottleneckBlock, self).__init__()

        expanded_channels = in_channels * expansion
        self.conv1 = nn.Conv2d(in_channels, in_channels //2, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(in_channels //2)
        self.relu = nn.PReLU()

        #depthwise conv
        self.depthwise_conv = DepthwiseSeparableConv2d(in_channels //2, in_channels //2, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(in_channels //2)


        self.conv3 = nn.Conv2d(in_channels //2, expanded_channels , kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(expanded_channels )
        

        # Remove the shortcut connection
        self.shortcut = nn.Sequential()


    def forward(self, x):

        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.depthwise_conv(out)))
        out = self.bn3(self.conv3(out))
 
        return out

 

class InvertedResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, expansion,stride=1):
        #is calling the constructor of the parent class nn.Module 
        super(InvertedResidualBlock, self).__init__()

        
        ###### Bloque de Expansion: narrow to wide
        # 1x1 convolution is applied to the input tensor, changing the number of channels.The key role of the 1x1 convolution with expansion is to change the number of channels. 
        # this convolution is used to expand the low-dimensional input feature map to a higher-dimensional space suited to non-linear activations
        expanded_channels = in_channels * expansion
        self.conv1 = nn.Conv2d(in_channels, expanded_channels, kernel_size=1, bias=False)


        self.bn1 = nn.BatchNorm2d(expanded_channels)
        #MobiFace uses PReLU for non linearity
        #PReLU introduces a learnable parameter, allowing the slope of the negative part of the activation to be adjusted during training.
        #mathematically is it equivalent to:
        #PReLU(x) -> x;x>=0
        #PReLU(x) -> alpha . x;x<0
        self.relu = nn.PReLU()

        ####### Wide to wide
        #A depthwise separable convolution is applied to the result of the previous step to achieve spatial filtering of hight dimensional tensor
        self.depthwise_conv = DepthwiseSeparableConv2d(expanded_channels, expanded_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(expanded_channels)


        ###### wide to narrow
        #pointwise convolution linear convolution
        #spatially-filtered feature map is projected back to a low-dimensional subspace
        self.conv3 = nn.Conv2d(expanded_channels, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)


        #he shortcut connection is a form of a residual connection/skip connection
        #Its purpose is to enable the smooth flow of gradients during backpropagation, aiding in the training of deep networks.
        #The shortcut helps mitigate potential vanishing or exploding gradient problems by providing a direct path for information flow.
        #shortcut is a sequential module that represents a shortcut connection.It is designed to connect the input directly to the output of the block, bypassing the internal transformations, if certain conditions are met.
        self.shortcut = nn.Sequential()
        #checks whether the number of input channels is not equal to the number of output channels after expansion. If this condition is true, it implies that there is a change in the number of channels, and a shortcut connection is needed to match dimensions.
        if stride != 1 or in_channels != out_channels:
            print('shortcut')
            self.shortcut = nn.Sequential(
                #If the condition is met, a shortcut connection is created using a 1x1 convolution followed by batch normalization.The 1x1 convolution adjusts the number of channels, ensuring compatibility for element-wise addition with the output of the block.
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels ),
                
            )

    #defines the forward pass of the network, specifying how the input data is transformed through the layers of the network to produce the final outpu
    def forward(self, x):
        #expansion
        out = self.relu(self.bn1(self.conv1(x)))

        #depthwise conv
        out = self.relu(self.bn2(self.depthwise_conv(out)))

        #linear activation
        out = self.bn3(self.conv3(out))
        

        # Shortcut Connection -  the shortcut is applied during the forward pass - it is adding the original input tensor x to output tensor
        # effectively acting as a residual connection, helping to create a shortcut path for information flow and facilitating gradient propagation during backpropagation.
        out += self.shortcut(x)


        return out

class MobiFace(nn.Module):
    def __init__(self):
        super(MobiFace, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.depthwise_conv = DepthwiseSeparableConv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        
        # Bottleneck blocks followed by nverted Residual bottleneck blocks
        self.bottleneck_block1 = BottleneckBlock(64, 64, expansion=1, stride=2)
        self.residual_block1 = InvertedResidualBlock(64, 64, expansion=2)
        
        self.bottleneck_block2 = BottleneckBlock(64, 128, expansion=2, stride=2)
        self.residual_block2 = InvertedResidualBlock(128, 128, expansion=2)
        
        self.bottleneck_block3 = BottleneckBlock(128, 256, expansion=2, stride=2)
        self.residual_block3 = InvertedResidualBlock(256, 256, expansion=2)
        
        self.conv3 = nn.Conv2d(256, 512, kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(512)
        self.fc = nn.Linear(512,512)
        self.relu = nn.PReLU()

    def forward(self, x):

        #some print statements were added in case of checking the input shape transformation accross the network 
        

        out = self.relu(self.bn1(self.conv1(x)))
        

        out = self.relu(self.bn2(self.depthwise_conv(out)))
        
        
        # First bottleneck block followed by residual block
        out = self.bottleneck_block1(out)
        
        out = self.residual_block1(out)
        
        
        # Second bottleneck block followed by residual block
        out = self.bottleneck_block2(out)
        
        out = self.residual_block2(out)
        

        # Third bottleneck block followed by residual block
        out = self.bottleneck_block3(out)
        
        out = self.residual_block3(out)
        
        
        out = self.bn3(self.conv3(out))
        out = torch.mean(out, dim=[2, 3])  # Global Average Pooling
        out = self.fc(out)
        return out

# Create an instance of the MobiFace model
model = MobiFace()

# Print the model architecture
print(model)



MobiFace(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (depthwise_conv): DepthwiseSeparableConv2d(
    (depthwise_conv): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64, bias=False)
    (pointwise_conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): PReLU(num_parameters=1)
  )
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bottleneck_block1): BottleneckBlock(
    (conv1): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): PReLU(num_parameters=1)
    (depthwise_conv): DepthwiseSeparableConv2d(
      (depthwise_conv): Conv2d(32, 32, kernel_size=(3, 3), stri

In [4]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total = 0
# Count parameters per layer
for name, param in model.named_parameters():
    print(f"Layer: {name}, Parameters: {param.numel()}")
    total+=param.numel()
    print("total ",total)

Layer: conv1.weight, Parameters: 1728
total  1728
Layer: bn1.weight, Parameters: 64
total  1792
Layer: bn1.bias, Parameters: 64
total  1856
Layer: depthwise_conv.depthwise_conv.weight, Parameters: 576
total  2432
Layer: depthwise_conv.pointwise_conv.weight, Parameters: 4096
total  6528
Layer: depthwise_conv.bn.weight, Parameters: 64
total  6592
Layer: depthwise_conv.bn.bias, Parameters: 64
total  6656
Layer: depthwise_conv.relu.weight, Parameters: 1
total  6657
Layer: bn2.weight, Parameters: 64
total  6721
Layer: bn2.bias, Parameters: 64
total  6785
Layer: bottleneck_block1.conv1.weight, Parameters: 2048
total  8833
Layer: bottleneck_block1.bn1.weight, Parameters: 32
total  8865
Layer: bottleneck_block1.bn1.bias, Parameters: 32
total  8897
Layer: bottleneck_block1.relu.weight, Parameters: 1
total  8898
Layer: bottleneck_block1.depthwise_conv.depthwise_conv.weight, Parameters: 288
total  9186
Layer: bottleneck_block1.depthwise_conv.pointwise_conv.weight, Parameters: 1024
total  10210
La

In [5]:
dummy_input = torch.randn(1, 3, 112, 112)


# Print the initial input size
print(f"Initial Input Size: {dummy_input.shape}")

out = model(dummy_input)

# Print the final output size
print(f"Final Output Size: {out.shape}")

Initial Input Size: torch.Size([1, 3, 112, 112])
Final Output Size: torch.Size([1, 512])


In [6]:
summary(model, (1,3,112,112))

Layer (type:depth-idx)                        Output Shape              Param #
MobiFace                                      [1, 512]                  --
├─Conv2d: 1-1                                 [1, 64, 56, 56]           1,728
├─BatchNorm2d: 1-2                            [1, 64, 56, 56]           128
├─PReLU: 1-3                                  [1, 64, 56, 56]           1
├─DepthwiseSeparableConv2d: 1-4               [1, 64, 56, 56]           --
│    └─Conv2d: 2-1                            [1, 64, 56, 56]           576
│    └─Conv2d: 2-2                            [1, 64, 56, 56]           4,096
│    └─BatchNorm2d: 2-3                       [1, 64, 56, 56]           128
│    └─PReLU: 2-4                             [1, 64, 56, 56]           1
├─BatchNorm2d: 1-5                            [1, 64, 56, 56]           128
├─PReLU: 1-6                                  [1, 64, 56, 56]           (recursive)
├─BottleneckBlock: 1-7                        [1, 64, 28, 28]           --
│  

In [7]:
# Define data transformations
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
])

In [8]:
# Path to your dataset folder
root_dir = "C:\\Users\\mathe\\OneDrive\\Área de Trabalho\\master\\TFM\\dataset\\faces_webface_112x112\\images"

# directory for test
#root_dir= "C:\\Users\\mathe\\OneDrive\\Área de Trabalho\\master\\TFM\\dataset\\faces_webface_112x112\\small_sample"

# Create ImageFolder dataset
dataset = datasets.ImageFolder(root=root_dir, transform=transform)

# Create DataLoader for training
dataloader = DataLoader(dataset, batch_size=128, shuffle=True, num_workers=0)

In [9]:
# Split the dataset into training and validation sets
train_size = int(0.6 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset)  - train_size -val_size
train_dataset, val_dataset,test_dataset = random_split(dataset, [train_size, val_size,test_size])

In [10]:
print("train size",len(train_dataset))

train size 294373


In [11]:
batch_size = 64
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
test_dataloader = DataLoader(test_size, batch_size=batch_size, shuffle=False, num_workers=0)

In [12]:
# Number of epochs

num_epochs = 25

In [15]:
# Move the model to the desired device
model = model.to(device)



In [16]:
model.eval()

MobiFace(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (depthwise_conv): DepthwiseSeparableConv2d(
    (depthwise_conv): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64, bias=False)
    (pointwise_conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): PReLU(num_parameters=1)
  )
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bottleneck_block1): BottleneckBlock(
    (conv1): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): PReLU(num_parameters=1)
    (depthwise_conv): DepthwiseSeparableConv2d(
      (depthwise_conv): Conv2d(32, 32, kernel_size=(3, 3), stri

In [17]:
#
embedding_size = 512

In [18]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)

In [19]:
new_num_classes = 10572
# Modify the number of classes in the last linear layer
model.fc = nn.Linear(in_features=512, out_features=new_num_classes).to(device)

import torch.nn.functional as F


# Define loss function and optimizer after modifying the model
criterion = nn.CrossEntropyLoss()
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch_idx, (inputs, labels) in enumerate(train_dataloader):
        
        # Move inputs and labels to the specified device
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if (batch_idx + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_dataloader)}], Loss: {loss.item():.4f}')

    average_loss = total_loss / len(train_dataloader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Average Loss: {average_loss:.4f}')

    # Validation loop
    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for val_inputs, val_labels in val_dataloader:
            val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)

            val_outputs = model(val_inputs)
            _, predicted = torch.max(val_outputs, 1)

            total_correct += (predicted == val_labels).sum().item()
            total_samples += val_labels.size(0)

    accuracy = total_correct / total_samples
    print(f'Epoch [{epoch + 1}/{num_epochs}], Validation Accuracy: {accuracy:.4f}')

torch.save(model, 'full_mobiFace_like_v1.pth') 
torch.save(model.state_dict(), 'dict_mobiFace_live_v1.pth') 