In [1]:
from pyturk.datasets import MSCTD
import cv2
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
from facenet_pytorch import MTCNN
from tqdm.notebook import tqdm
from torchvision.io import read_image
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor, Compose, Resize, ToPILImage
from prime_augmentations.utils.rand_filter import RandomFilter
from prime_augmentations.utils.color_jitter import RandomSmoothColor
from prime_augmentations.utils.diffeomorphism import Diffeo
from prime_augmentations.config import imagenet100_cfg
from random import choice
from torchvision import models
from torch.utils.data import Dataset

INFO:numexpr.utils:Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


### 1-1-1

In [3]:
# load train, dev and test datasets

train_dataset_orig = MSCTD(
    root='data',
    mode='train',
    download=False,
    cnn_mode=True,
    image_transform=Compose([
        Resize((64, 128)),
        ToTensor(),
    ])
)

test_dataset_orig = MSCTD(
    root='data',
    mode='test',
    download=False,
    cnn_mode=True,
    image_transform=Compose([
        Resize((64, 128)),
        ToTensor(),
    ])
)

INFO:root:opening and reading files...
INFO:root:opening and reading files...


In [4]:
transform_to_tensor = ToTensor()

def input_transform(img):
    img = img.resize((128, 128))
    return transform_to_tensor(img)

In [5]:
# extract faces from training set
# images_dir_path_train = train_dataset_orig.extract_faces()
images_dir_path_train = './data/MSCTD/faces/train'
train_dataset = ImageFolder(root=images_dir_path_train, transform=input_transform)

In [6]:
# extract faces from test set
# images_dir_path_test = test_dataset_orig.extract_faces()
images_dir_path_test = './data/MSCTD/faces/test'
test_dataset = ImageFolder(root=images_dir_path_test, transform=input_transform)

### 1-1-2

In [7]:
class CNNNetwork(nn.Module):
    
    def __init__(self):
        super().__init__()
        
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 16, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(16, 4, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(4),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )

        self.mlp_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(49*4, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 3),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.cnn_layers(x)
        x = self.mlp_layers(x)
        return x

In [8]:
model = CNNNetwork().to(device)

In [9]:
# setting hyperparameters
learning_rate = 1e-4
batch_size = 32
epochs = 5

In [10]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [11]:
def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    size = len(dataloader.dataset)
    num_correct = 0
    for batch_num, (X, y) in enumerate(dataloader):
        # forward prop
        X = X.to(torch.float32).to(device)
        y = y.to(device)
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        num_correct += (y_pred.argmax(dim=1) == y).sum().item()
        # back prop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    accuracy = num_correct / size
    print(f'train_loss: {loss.item()}')
    print(f'Train Accuracy: {np.round(accuracy*100,2)}%')
    
            
def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_correct = 0
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            X = X.to(torch.float32).to(device)
            y = y.to(device)
            y_pred = model(X)
            loss = loss_fn(y_pred, y)
            num_correct += (y_pred.argmax(dim=1) == y).sum().item()
                        
    accuracy = num_correct / size
    print(f'Test Accuracy: {np.round(accuracy*100, 2)}%')
    

In [12]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for i in range(1, epochs+1):
    print(f'epoch {i}', '='*50)
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
    print()
    

train_loss: 1.0411368608474731
Train Accuracy: 37.28%
Test Accuracy: 40.02%

train_loss: 1.041334629058838
Train Accuracy: 37.94%
Test Accuracy: 38.35%

train_loss: 1.0953128337860107
Train Accuracy: 38.87%
Test Accuracy: 37.72%

train_loss: 1.0574090480804443
Train Accuracy: 39.21%
Test Accuracy: 37.39%

train_loss: 1.1481467485427856
Train Accuracy: 39.61%
Test Accuracy: 36.99%



In [13]:
torch.save(model, "./Models/first_model.pt")

### 1-1-3

In [14]:
# initialize MTCNN face detector
mtcnn = MTCNN(
    device=device,
    select_largest=False,
    post_process=False,
    margin=50,
    keep_all=True
)

In [15]:
num_correct = 0
to_pil_image = ToPILImage()

for img, label in tqdm(test_dataset_orig):
    img = to_pil_image(img)
    faces = mtcnn(img)
    true_label = torch.tensor(label).to(device)
    if faces is None:
        # if there is no face in the image, label it as negative (the class with most data)
        num_correct += (1 == true_label).sum().item()
        continue
    estimated_labels = []
    for face in faces:
        face = face.permute(1, 2, 0)  
        face = torch.tensor(cv2.resize(face.numpy(), (128, 128)))
        face = face.to(device) / 255
        face = face.permute(2, 1, 0)
        face = face.reshape(1, *face.shape)
        y_pred = model(face).flatten()
        estimated_labels.append(y_pred.argmax().cpu().item())
    most_voted_label = np.bincount(estimated_labels).argmax()
    num_correct += (most_voted_label == true_label).sum().item()
    
print(f'accuracy: {np.round(num_correct * 100 / len(test_dataset_orig), 3) } %')

  0%|          | 0/5067 [00:00<?, ?it/s]

accuracy: 35.879 %


### 1-2-1

In [16]:
config = imagenet100_cfg.get_config()

diffeo = Diffeo(
    sT=config.diffeo.sT, rT=config.diffeo.rT,
    scut=config.diffeo.scut, rcut=config.diffeo.rcut,
    cutmin=config.diffeo.cutmin, cutmax=config.diffeo.cutmax,
    alpha=config.diffeo.alpha, stochastic=True
)

color = RandomSmoothColor(
    cut=config.color_jit.cut, T=config.color_jit.T,
    freq_bandwidth=config.color_jit.max_freqs, stochastic=True
)

filt = RandomFilter(
    kernel_size=3,
    sigma=1,
    stochastic=True
)

identical = lambda x: x

In [17]:
def custom_transform_aug_only(x):
    transforms = [diffeo, color, filt]
    x = input_transform(x)
    random_transform = choice(transforms)
    x = random_transform(x)
    return x

images_dir_path_test = './data/MSCTD/faces/test'
test_dataset_aug_only = ImageFolder(
    root=images_dir_path_test,
    transform = custom_transform_aug_only
)

### 1-2-2

In [18]:
loss_fn = nn.CrossEntropyLoss()
test_dataloader = DataLoader(test_dataset_aug_only, batch_size=batch_size, shuffle=False)
test_loop(test_dataloader, model, loss_fn)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Test Accuracy: 38.24%


### 1-2-3

In [19]:
def custom_transform_aug_with_identical(x):
    transforms = [identical, diffeo, color, filt]
    x = input_transform(x)
    random_transform = choice(transforms)
    x = random_transform(x)
    return x

In [20]:
# extract faces from training set
images_dir_path_train = './data/MSCTD/faces/train'
train_dataset_aug_with_identical = ImageFolder(
    root=images_dir_path_train,
    transform = custom_transform_aug_with_identical
)

images_dir_path_test = './data/MSCTD/faces/test'
test_dataset_aug_with_identical = ImageFolder(
    root=images_dir_path_test,
    transform = custom_transform_aug_with_identical
)

In [21]:
class CNNNetwork2(nn.Module):
    
    def __init__(self):
        super().__init__()
        
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 16, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(16, 4, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(4),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )

        self.mlp_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(49*4, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 3),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.cnn_layers(x)
        x = self.mlp_layers(x)
        return x

In [22]:
model2 = CNNNetwork2().to(device)

In [23]:
# setting hyperparameters
learning_rate = 5e-3
batch_size = 32
epochs = 5

In [24]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model2.parameters(), lr=learning_rate)
train_dataloader = DataLoader(train_dataset_aug_with_identical, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset_aug_with_identical, batch_size=batch_size, shuffle=False)

In [25]:
for i in range(1, epochs+1):
    print(f'epoch {i}', '='*50)
    train_loop(train_dataloader, model2, loss_fn, optimizer)
    test_loop(test_dataloader, model2, loss_fn)
    print()

train_loss: 1.0971670150756836
Train Accuracy: 37.07%
Test Accuracy: 39.99%

train_loss: 1.1271135807037354
Train Accuracy: 37.19%
Test Accuracy: 40.97%

train_loss: 1.0743521451950073
Train Accuracy: 37.14%
Test Accuracy: 40.97%

train_loss: 1.0734622478485107
Train Accuracy: 37.21%
Test Accuracy: 40.97%

train_loss: 1.0424246788024902
Train Accuracy: 37.21%
Test Accuracy: 40.97%



In [26]:
torch.save(model2, "./Models/second_model.pt")

In [27]:
# test the network using original data
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
test_loop(test_dataloader, model2, loss_fn)

Test Accuracy: 40.97%


In [28]:
# test the network using augmented data
test_dataloader = DataLoader(test_dataset_aug_only, batch_size=batch_size, shuffle=False)
test_loop(test_dataloader, model2, loss_fn)

Test Accuracy: 40.97%


### 2-1-1

We are using resnet50 for this part.

### 2-1-2

In [29]:
pretrained_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

for param in pretrained_model.parameters():
    param.requires_grad = False

In [30]:
# structure of the network
pretrained_model

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [31]:
# change last fully connected layer
in_features = pretrained_model.fc.in_features
pretrained_model.fc = nn.Sequential(
    nn.Linear(in_features, 1024),
    nn.Dropout(0.3),
    nn.ReLU(),
    nn.Linear(1024, 256),
    nn.ReLU(),
    nn.Linear(256, 64),
    nn.ReLU(),
    nn.Linear(64, 3)
)
pretrained_model = pretrained_model.to(device)

In [32]:
# choose which parameters we will be updating during training
params_to_update = [param for param in pretrained_model.parameters() if param.requires_grad]
params_to_update

[Parameter containing:
 tensor([[ 0.0045,  0.0064,  0.0001,  ..., -0.0054,  0.0030, -0.0128],
         [-0.0087,  0.0171, -0.0170,  ...,  0.0041,  0.0166,  0.0183],
         [ 0.0179,  0.0165, -0.0047,  ...,  0.0098,  0.0206,  0.0048],
         ...,
         [ 0.0030, -0.0191, -0.0116,  ...,  0.0013,  0.0199, -0.0038],
         [-0.0038,  0.0114, -0.0010,  ..., -0.0176,  0.0057, -0.0130],
         [ 0.0103, -0.0024, -0.0207,  ...,  0.0196,  0.0200, -0.0021]],
        device='cuda:0', requires_grad=True),
 Parameter containing:
 tensor([ 0.0047,  0.0218,  0.0210,  ..., -0.0168, -0.0140, -0.0183],
        device='cuda:0', requires_grad=True),
 Parameter containing:
 tensor([[-0.0223, -0.0301, -0.0159,  ..., -0.0305, -0.0016,  0.0172],
         [ 0.0255, -0.0121,  0.0093,  ...,  0.0223,  0.0100,  0.0173],
         [ 0.0221,  0.0265, -0.0075,  ..., -0.0092,  0.0224, -0.0217],
         ...,
         [-0.0047, -0.0084, -0.0246,  ...,  0.0121, -0.0226,  0.0131],
         [ 0.0239, -0.0295, -0

In [33]:
# setting hyperparameters
learning_rate = 1e-5
batch_size = 32
epochs = 4

In [34]:
# initialize the loss function and choose an optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params_to_update, lr=learning_rate)
train_dataloader = DataLoader(train_dataset_orig, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset_orig, batch_size=batch_size, shuffle=False)

In [35]:
for i in range(1, epochs+1):
    print(f'epoch {i}', '='*50)
    train_loop(train_dataloader, pretrained_model, loss_fn, optimizer)
    test_loop(test_dataloader, pretrained_model, loss_fn)
    print()

train_loss: 1.0940569639205933
Train Accuracy: 37.51%
Test Accuracy: 42.65%

train_loss: 1.0990331172943115
Train Accuracy: 38.78%
Test Accuracy: 42.71%

train_loss: 1.0591241121292114
Train Accuracy: 38.9%
Test Accuracy: 42.37%

train_loss: 1.1087825298309326
Train Accuracy: 39.73%
Test Accuracy: 42.25%



### 2-1-3

As we can see, the accuray is improved, since we are using a pretrained network and it's trained on a huge dataset.It extracts features from images much better than our own network, resulting in better classification.

### 3-1-1

In [36]:
# get outputs of the pretrained model
train_dataloader = DataLoader(train_dataset_orig, batch_size=1, shuffle=False)
test_dataloader = DataLoader(test_dataset_orig, batch_size=1, shuffle=False)
pretrained_model_outputs_train = []
pretrained_model_outputs_test = []
train_labels = []
test_labels = []

def test_loop2(dataloader, model, item_list, label_list):
    model.eval()
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            X = X.to(torch.float32).to(device)
            y = y.to(device)
            y_pred = model(X).flatten()
            item_list.append(y_pred)
            label_list.append(y.item())

test_loop2(train_dataloader, pretrained_model, pretrained_model_outputs_train, train_labels)
test_loop2(test_dataloader, pretrained_model, pretrained_model_outputs_test, test_labels)

In [37]:
# get outputs of the augmented model
train_dataloader = DataLoader(train_dataset_orig, batch_size=1, shuffle=False)
test_dataloader = DataLoader(test_dataset_orig, batch_size=1, shuffle=False)
augmented_model_outputs_train = []
augmented_model_outputs_test = []
num_faces_train = []
num_faces_test = []

def test_loop3(dataloader, model, num_faces_append_list, output_append_list):
    model.eval()
    to_pil_image = ToPILImage()
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            X = to_pil_image(X.squeeze())
            faces = mtcnn(X)
            true_label = torch.tensor(label).to(device)
            if faces is None:
                # if there is no face in the image, use [0, 0, 0] as output
                num_faces_append_list.append(torch.tensor([0]).to(device))
                output_append_list.append(torch.tensor([0., 0., 0.]).to(device))
                continue
            num_faces_append_list.append(torch.tensor([len(faces)]).to(device))
            estimated_logits = []
            for face in faces:
                face = face.permute(1, 2, 0)  
                face = torch.tensor(cv2.resize(face.numpy(), (128, 128)))
                face = face.to(device) / 255
                face = face.permute(2, 1, 0)
                face = face.reshape(1, *face.shape)
                y_pred = model(face).flatten()
                estimated_logits.append(y_pred)
                
            logits_avg = sum(estimated_logits) / len(estimated_logits)
            output_append_list.append(logits_avg)

test_loop3(train_dataloader, model2, num_faces_train, augmented_model_outputs_train)
test_loop3(test_dataloader, model2, num_faces_test, augmented_model_outputs_test)

In [38]:
class MyDataset(Dataset):
    def __init__(self, pretrained_outputs, aug_outputs, num_faces, labels, transform=None, target_transform=None):
        self.pretrained_outputs = pretrained_outputs
        self.aug_outputs = aug_outputs
        self.num_faces = num_faces
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform
    
    def __len__(self):
        return len(self.num_faces)
    
    def __getitem__(self, index):
        
        item = torch.concat((
            self.pretrained_outputs[index],
            self.aug_outputs[index],
            self.num_faces[index]
        ))
        label = self.labels[index]
        
        if self.transform is not None:
            item = self.transform(item)
            
        if self.target_transform is not None:
            label = self.target_transform(label)
            
        return item, label

In [39]:
train_dataset_mixed = MyDataset(
    pretrained_model_outputs_train,
    augmented_model_outputs_train,
    num_faces_train,
    train_labels
)

test_dataset_mixed = MyDataset(
    pretrained_model_outputs_test,
    augmented_model_outputs_test,
    num_faces_test,
    test_labels
)

In [40]:
class MLPNetwork3(nn.Module):
    
    def __init__(self):
        super().__init__()
        
        self.mlp_layers = nn.Sequential(
            nn.Linear(7, 32),
            nn.ReLU(),
            nn.Linear(32, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 100),
            nn.ReLU(),
            nn.Linear(100, 32),
            nn.ReLU(),
            nn.Linear(32, 8),
            nn.ReLU(),
            nn.Linear(8, 3),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.mlp_layers(x)
        return x
    

In [41]:
model3 = MLPNetwork3().to(device)

In [42]:
# setting hyperparameters
learning_rate = 5e-5
batch_size = 64
epochs = 20

In [43]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model3.parameters(), lr=learning_rate)
train_dataloader = DataLoader(train_dataset_mixed, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset_mixed, batch_size=batch_size, shuffle=False)

In [44]:
for i in range(1, epochs+1):
    print(f'epoch {i}', '='*50)
    train_loop(train_dataloader, model3, loss_fn, optimizer)
    test_loop(test_dataloader, model3, loss_fn)
    print()

train_loss: 1.0801384449005127
Train Accuracy: 34.2%
Test Accuracy: 25.62%

train_loss: 1.170766830444336
Train Accuracy: 38.39%
Test Accuracy: 42.69%

train_loss: 1.102203607559204
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.0815935134887695
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.0798221826553345
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.1017638444900513
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.1116816997528076
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.1044787168502808
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.1283625364303589
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.0690587759017944
Train Accuracy: 38.71%
Test Accuracy: 42.69%

train_loss: 1.0542412996292114
Train Accuracy: 38.74%
Test Accuracy: 42.69%

train_loss: 1.1131874322891235
Train Accuracy: 39.03%
Test Accuracy: 42.29%

train_loss: 1.0595707893371582
Train Accuracy: 39.99%
Test Accuracy: 42.23%

tr

In [45]:
torch.save(model3, "./Models/Part3_model.pt")