In [1]:
'''Libraries'''

from typing import Any, List, Tuple, Callable, Dict
import numpy as np
import pandas as pd
import collections

import plotly.express as px
import plotly.graph_objects as go
import plotly.subplots as sp

from sklearn import metrics
import os


import torch
import torchvision
from torchvision import datasets, transforms
import torchvision.transforms as tt
from torchvision.datasets.utils import download_url
from torch.utils.data import random_split

import warnings
warnings.filterwarnings('ignore')



In [2]:
'''Device Info'''

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [3]:
torch.cuda.device_count()

2

In [4]:
!nvidia-smi

Sun Jan 28 09:02:22 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.129.03             Driver Version: 535.129.03   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   36C    P8               9W /  70W |      3MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
|   1  Tesla T4                       Off | 00000000:00:05.0 Off |  

In [8]:
'''Directory Paths'''

main_path = '/kaggle/input/140k-real-and-fake-faces/real_vs_fake/real-vs-fake'
train_dir = os.path.join(main_path, 'train')
valid_dir = os.path.join(main_path, 'valid')
test_dir = os.path.join(main_path, 'test')

In [9]:
'''Data Augmentation'''

train_transforms = tt.Compose([
    tt.RandomHorizontalFlip(),
    tt.Resize(size = (256, 256)),
    tt.RandomRotation(degrees = 30),
    tt.RandomAffine(degrees = (0, 30), shear = 0.2, translate = (0.2, 0.2)),
    tt.RandomPerspective(distortion_scale = 0.2, p = 0.5),
    tt.ToTensor(),
    tt.Normalize(mean = (0.485, 0.456, 0.406), std = (0.229, 0.224, 0.225))
])

valid_transforms = tt.Compose([
    tt.ToTensor(),
    tt.Normalize(mean = (0.485, 0.456, 0.406), std = (0.229, 0.224, 0.225))
])

In [11]:
'''Data Scanner'''

train_ds = torchvision.datasets.ImageFolder(root = train_dir, transform = train_transforms)
valid_ds = torchvision.datasets.ImageFolder(root = valid_dir, transform = valid_transforms)
test_ds = torchvision.datasets.ImageFolder(root = test_dir, transform = valid_transforms)
print(f'train : {len(train_ds)}, test : {len(test_ds)}, valid : {len(valid_ds)}')

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/140k-real-and-fake-faces/real_vs_fake/real-vs-fake/train'

In [8]:
random_seed = 42
torch.manual_seed(random_seed);
batch_size = 64

In [9]:
'''Data Loaders'''

train_loader = torch.utils.data.DataLoader(dataset = train_ds, batch_size = batch_size, num_workers = 4, pin_memory = True, shuffle = True)
valid_loader = torch.utils.data.DataLoader(dataset = valid_ds, batch_size = batch_size, num_workers = 4, pin_memory = True, shuffle = True)
test_loader = torch.utils.data.DataLoader(dataset = test_ds, batch_size = batch_size, num_workers = 4, pin_memory = True, shuffle = True)

In [10]:
'''Plotter Function'''

def plotter(img : torch.Tensor, lbl : torch.Tensor) -> None:
    for i in range(16):
        titles = ['real' if x == 1 else 'fake' for x in lbl]
    fig = sp.make_subplots(rows = 4, cols = 4, subplot_titles = titles)
    for i in range(16):
        row = i // 4 + 1
        col = i % 4 + 1
        mean = [0.485, 0.456, 0.406]
        std = [0.229, 0.224, 0.225]
        inverser = tt.Normalize(mean = [-m / s for m,s in zip(mean, std)], std = [1. / s for s in std])
        original_img = inverser(img[i])
        view = original_img.permute((1, 2, 0)).numpy()
        fig.add_trace(px.imshow(view).data[0], row = row, col = col)

        fig.update_xaxes(showticklabels = False, row = row, col = col)
        fig.update_yaxes(showticklabels = False, row = row, col = col)

        fig.update_layout(height = 800, width = 1000)
    fig.show()

In [11]:
img, lbl = next(iter(train_loader))
plotter(img, lbl)

In [12]:
'''ResNet'''

def conv_block(in_channels : int, out_channels : int, pool : bool = False) -> torch.nn.Sequential:
    layers = [torch.nn.Conv2d(in_channels, out_channels, kernel_size = 3, padding = 1),
             torch.nn.BatchNorm2d(out_channels),
             torch.nn.ReLU(inplace = True)]
    if pool:
        layers.append(torch.nn.MaxPool2d(2))
    return torch.nn.Sequential(*layers)

class ResNet9(torch.nn.Module):
    def __init__(self, in_channels : int, num_classes : int) -> None:
        super().__init__()
        
        self.conv1 = conv_block(in_channels, 32)
        self.conv2 = conv_block(32, 64, pool = True)
        self.res1 = torch.nn.Sequential(conv_block(64, 64), conv_block(64, 64))
        
        self.conv3 = conv_block(64, 128)
        self.conv4 = conv_block(128, 256, pool = True)
        self.res2 = torch.nn.Sequential(conv_block(256, 256), conv_block(256, 256))
        
        self.conv5 = conv_block(256, 512)
        self.conv6 = conv_block(512, 512, pool = True)
        self.res3 = torch.nn.Sequential(conv_block(512, 512), conv_block(512, 512))
        
        self.classifier = torch.nn.Sequential(torch.nn.AdaptiveMaxPool2d(1),
                                             torch.nn.Flatten(),
                                             torch.nn.Dropout(0.2),
                                             torch.nn.Linear(512, num_classes))
        
    def forward(self, xb : torch.Tensor) -> torch.Tensor:
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.conv5(out)
        out = self.conv6(out)
        out = self.res3(out) + out
        out = self.classifier(out)
        return out
        

In [13]:
'''Instantiating model'''

model = ResNet9(3, 2)
model = torch.nn.DataParallel(model)
model.to(device)

DataParallel(
  (module): ResNet9(
    (conv1): Sequential(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (conv2): Sequential(
      (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (res1): Sequential(
      (0): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
      )
      (1): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=

In [14]:
'''Params info'''

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
non_trainable_params = total_params - trainable_params
print(f'trainable_params : {trainable_params}')
print(f'non-trainable_params : {non_trainable_params}')
print(f'total_params : {total_params}')

trainable_params : 9909378
non-trainable_params : 0
total_params : 9909378


In [38]:
'''Helper class'''

class helpers:
    def __init__(
    self, 
    model : torch.nn.Module,
    optimizer : torch.optim.Optimizer,
    losser : torch.nn.Module,
    lr_scheduler : Any, 
    device : torch.device
    ) -> None:
        self.model = model.to(device)
        self.optimizer = optimizer
        self.losser = losser
        self.lr_scheduler = lr_scheduler
        self.device = device
    
    def loss_func(self, output : torch.Tensor, target : torch.Tensor) -> torch.Tensor:
        return self.losser(output, target)
    
    def train_step(self, input_data : torch.Tensor, target : torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        self.model.train()
        output = self.model(input_data)
        loss = self.loss_func(output, target)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss, output
    
    def valid_step(self, input_data : torch.Tensor, target : torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        self.model.eval()
        with torch.inference_mode():
            output = self.model(input_data)
            loss = self.loss_func(output, target)
        return loss, output
    
    def train(self, train_loader : torch.utils.data.DataLoader) -> List[float]:
        train_losses : List[float] = []
        
        for i, (input_data, target) in enumerate(train_loader):
            input_data = input_data.to(self.device)
            target = target.to(self.device)
            
            loss, output = self.train_step(input_data, target)
            del input_data
            del target
            torch.cuda.empty_cache()
            
            print(f'training.... {i + 1} / {len(train_loader)}, loss : {loss.item()}')
            train_losses.append(loss.item())
        self.lr_scheduler.step()
        return train_losses
    
    def validate(self, valid_loader : torch.utils.data.DataLoader) -> Tuple[List[float], Dict[str, float]]:
        val_losses : List[float] = []
        tracker : Dict[str, float] = {
                'accuracy' : [],
                'f1_score' : [],
                'precision' : [],
                'recall' : [],
                'roc_auc' : []
            }
        for i, (input_data, target) in enumerate(valid_loader):
            input_data = input_data.to(self.device)
            target = target.to(self.device)
            
            loss, output = self.valid_step(input_data, target)
            
            accuracy_score = metrics.accuracy_score(target.numpy(), output.numpy())
            f1_score = metrics.f1_score(target.numpy(), outpput.numpy())
            precision_score = metrics.precision_score(target.numpy(), output.numpy())
            recall_score = metrics.recall_score(target.numpy(), output.numpy())
            
            prob = torch.nn.Functional.softmax(output, dim = 1)[:, 1]
            roc_auc = metrics.roc_auc_score(target.numpy(), prob.numpy())
            
            tracker['accuracy'].append(accuracy_score)
            tracker['f1_score'].append(f1_score)
            tracker['precision'].append(presision_score)
            tracker['recall'].append(recall_score)
            tracker['aur_roc'].append(roc_auc)
            
            del input_data
            del target
            torch.cuda.empty_cache()
            
            print(f'validation.....{i + 1} / {len(valid_loader)}, loss : {loss.item()}')
            val_losses.append(loss.item())
        return val_losses, tracker
    
    def test(self, test_loader : torch.utils.data.DataLoader) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        test_preds : torch.Tensor = torch.Tensor([])
        test_tgts : torch.Tensor = torch.Tensor([])
        outputs : torch.Tensor = torch.Tensor([])
        
        for i, (input_data, target) in enumerate(test_loader):
            input_data = input_data.to(self.device)
            target = target.to(self.device)
            _, output = self.valid_step(input_data, target)

            print(f'testing.....{i + 1} / {len(test_loader)}, loss : {_.item()}')
            
            target = target.to('cpu')
            output = output.to('cpu')
            pred = output.argmax(dim = -1)
            
            outputs = torch.cat((outputs, output), dim = 0)
            test_tgts = torch.cat((test_tgts, target), dim = 0)
            test_preds = torch.cat((test_preds, pred), dim = 0)
            
            del input_data
            del target
            torch.cuda.empty_cache()
        return outputs, test_tgts, test_preds
    
    def save(self, path : str) -> None:
        torch.save(self.model.state_dict(), path)

In [11]:
'''Trainer Function'''

def model_trainer(train_loader : torch.utils.data.DataLoader,
                 valid_loader : torch.utils.data.DataLoader,
                 test_loader : torch.utils.data.DataLoader,
                 model : torch.nn.Module,
                 batch_size : int,
                 lr : float,
                 max_lr : float,
                 epochs : int,
                 device : torch.device,
                 start_factor : float = 0,
                 dropout : float = 0,
                 opt_func : torch.optim = torch.optim.AdamW) -> Tuple[torch.nn.Module, collections.OrderedDict, List[Dict[str, int]]]:
    torch.cuda.empty_cache()
    
    optimizer = opt_func(model.parameters(), lr = lr)
    lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer = optimizer, max_lr = max_lr, epochs = epochs, steps_per_epoch = len(train_loader))
    losser = torch.nn.CrossEntropyLoss()
    
    accuracy_score : float = 0.0
    f1_score : float = 0.0
    precision_score : float = 0.0
    recall_score : float = 0.0
    roc_auc_score : float = 0.0
    

    helper = helpers(model = model, optimizer = optimizer, losser = losser, lr_scheduler = lr_scheduler, device = device)
    val_trackers : List[Dict[str, int]] = []
        
    for epoch in range(epochs):
        print(f'************************************************    Epoch : {epoch + 1} / {epochs}    ************************************************')
        try:
            helper.train(train_loader)
            val_losses, tracker = helper.validate(valid_loader)
            
            print(f'************************************************    Epoch : {epoch + 1} / {epochs}, val_loss : {sum(val_losses) / len(val_losses)}    ************************************************')
            print(f'Rerults of the epoch {epoch + 1}')
            for key, value in tracker:
                print(f'{key} : {value}')

            val_trackers.append(tracker)
            torch.save(mode.state_dict(), f'resnet{epoch + 1}')
        except Exception as e:
            print(e)
            
            
    model.eval()
    with torch.inference_mode():
        outputs, tgt, pred = helper.test(test_loader)
        
    accuracy_score = metrics.accuracy_score(tgt.numpy(), pred.numpy())
    f1_score = metrics.f1_score(tgt.numpy(), pred.numpy())
    precision_score = metrics.precision_score(tgt.numpy(), pred.numpy())
    recall_score = metrics.recall_score(tgt.numpy(), pred.numpy())
    
    prob = torch.nn.Functional.softmax(outputs, dim = 1)[:, 1]
    roc_auc = metrics.roc_auc_score(tgt.numpy(), prob.numpy())
    
    test_tracker : Dict[str, float] = {
        'accuracy_score' : accuracy_score,
        'f1_score' : f1_score,
        'precision_score' : precision_score,
        'recall_score' : recall_score,
        'roc_auc_score' : roc_auc
    }
        
    print('\n\n\n')
    for key, value in test_tracker:
        print(f'{key} : {value}')

    val_trackers.append(test_tracker)
    state_dict = model.state_dict()

    return model, state_dict, val_trackers

In [None]:
# '''Training Call'''

# model, state_dict, trackers = model_trainer(train_loader = train_loader, 
#                      valid_loader = valid_loader, 
#                      test_loader = test_loader,
#                      model = model,
#                      batch_size = batch_size,
#                      lr = 0.0001,
#                      max_lr = 0.0001,
#                      epochs = 35,
#                      device = device)

In [None]:
'''Saving the model'''

# torch.save(model.state_dict(), 'resnetfinal')

In [13]:
# '''Pre Trained Model : Efficient net b4'''

# model = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_widese_b4', pretrained=True)

# '''Params info'''

# total_params = sum(p.numel() for p in model.parameters())
# trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
# non_trainable_params = total_params - trainable_params
# print(f'trainable_params : {trainable_params}')
# print(f'non-trainable_params : {non_trainable_params}')
# print(f'total_params : {total_params}')

Downloading: "https://github.com/NVIDIA/DeepLearningExamples/zipball/torchhub" to /root/.cache/torch/hub/torchhub.zip
Downloading: "https://api.ngc.nvidia.com/v2/models/nvidia/efficientnet_widese_b4_pyt_amp/versions/20.12.0/files/nvidia_efficientnet-widese-b4_210412.pth" to /root/.cache/torch/hub/checkpoints/nvidia_efficientnet-widese-b4_210412.pth
100%|██████████| 134M/134M [00:04<00:00, 32.2MB/s] 


trainable_params : 34927246
non-trainable_params : 0
total_params : 34927246


In [None]:
# model_trainer(train_loader = train_loader,
#              valid_loader = valid_loader, 
#              test_loader = test_loader,
#              model = model,
#              batch_size = 64,
#              lr = 0.0001,
#              max_lr = 0.0001,
#              epochs = ,
#              device = device,
#              start_factor = ,
#              dr)

In [61]:
model = torchvision.models.vgg16(pretrained = True)
for param in model.parameters():
    param.requires_grad = False

In [59]:
print(model.classifier[6].in_features, model.classifier[6].out_features)

4096 1000


In [62]:
'''Params info'''

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
non_trainable_params = total_params - trainable_params
print(f'trainable_params : {trainable_params}')
print(f'non-trainable_params : {non_trainable_params}')
print(f'total_params : {total_params}')

trainable_params : 0
non-trainable_params : 138357544
total_params : 138357544


In [63]:
'''15 epochs'''

def model_modifier(model : torchvision.models) -> torchvision.models:
    model.classifier[6] = torch.nn.Sequential(
        torch.nn.Linear(model.classifier[6].in_features, 512),
        torch.nn.ReLU(),
        torch.nn.BatchNorm1d(512),
        torch.nn.Linear(512, 2)
    )
    return model

In [64]:
'''weird trick'''
def model_modifier1(model : torchvision.models) -> torchvision.models:
    layers = list(model.classifier.children())[: -1] # layers in classifier
    layers.append(torch.nn.Sequential(torch.nn.Linear(model.classifier[6].in_features, 2)))
    model.classifier = torch.nn.Sequential(*layers)
    return model

In [65]:
def model_tester(model : torchvision.models,
                 test_loader : torch.utils.data.DataLoader,
                 max_lr : float,
                 optimizer : torch.optim,
                 lr : float,
                 epochs : int
                 ) -> None:
    
    optimizer = optimizer(model.parameters(), lr = lr)
    lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer = optimizer, max_lr = max_lr, epochs = epochs, steps_per_epoch = len(test_loader))
    losser = torch.nn.CrossEntropyLoss()
    
    helper = helpers(model = model, optimizer = optimizer, losser = losser, lr_scheduler = lr_scheduler, device = device)
    outputs, tgt, pred = helper.test(test_loader)
    
    accuracy_score = metrics.accuracy_score(tgt.numpy(), pred.numpy())
    f1_score = metrics.f1_score(tgt.numpy(), pred.numpy())
    precision_score = metrics.precision_score(tgt.numpy(), pred.numpy())
    recall_score = metrics.recall_score(tgt.numpy(), pred.numpy())
    
    prob = torch.nn.functional.softmax(outputs, dim = 1)[:, 1]
    roc_auc = metrics.roc_auc_score(tgt.numpy(), prob.numpy())
    
    test_tracker : Dict[str, float] = {
        'accuracy_score' : accuracy_score,
        'f1_score' : f1_score,
        'precision_score' : precision_score,
        'recall_score' : recall_score,
        'roc_auc_score' : roc_auc}
    
        
    print('\n\n\n')
    for key in test_tracker:
        print(f'{key} : {test_tracker[key]}')

In [66]:
model1 = model_modifier1(model)
model1 = model.to(device)
model1 = torch.nn.DataParallel(model1)
# model2 = model_modifier(model)

In [47]:
model_tester(model = model1, test_loader = test_loader, optimizer = torch.optim.AdamW, max_lr = 0.0001, epochs = 1, lr = 0.0001)

testing.....1 / 313, loss : 0.6882390975952148
testing.....2 / 313, loss : 0.7053088545799255
testing.....3 / 313, loss : 0.7062774896621704
testing.....4 / 313, loss : 0.6858657598495483
testing.....5 / 313, loss : 0.6910326480865479
testing.....6 / 313, loss : 0.7208291888237
testing.....7 / 313, loss : 0.6951457262039185
testing.....8 / 313, loss : 0.6957632899284363
testing.....9 / 313, loss : 0.7141115069389343
testing.....10 / 313, loss : 0.7395641207695007
testing.....11 / 313, loss : 0.7217293381690979
testing.....12 / 313, loss : 0.6957471966743469
testing.....13 / 313, loss : 0.7073827385902405
testing.....14 / 313, loss : 0.6847555041313171
testing.....15 / 313, loss : 0.7215121984481812
testing.....16 / 313, loss : 0.716099739074707
testing.....17 / 313, loss : 0.692469596862793
testing.....18 / 313, loss : 0.6967275142669678
testing.....19 / 313, loss : 0.7052717208862305
testing.....20 / 313, loss : 0.7201106548309326
testing.....21 / 313, loss : 0.7001123428344727
testin

In [67]:
'''Params info'''

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
non_trainable_params = total_params - trainable_params
print(f'trainable_params : {trainable_params}')
print(f'non-trainable_params : {non_trainable_params}')
print(f'total_params : {total_params}')

trainable_params : 8194
non-trainable_params : 134260544
total_params : 134268738


In [None]:
model, state_dict, val_trackers = model_trainer(train_loader = train_loader,
             valid_loader = valid_loader, 
             test_loader = test_loader,
             model = model1,
             batch_size = 64,
             lr = 0.000000001,
             max_lr = 0.000000001,
             epochs = 1,
             device = device
             )

************************************************    Epoch : 1 / 1    ************************************************
training.... 1 / 1563, loss : 0.6742910146713257
training.... 2 / 1563, loss : 0.6996212005615234
training.... 3 / 1563, loss : 0.6742959022521973
training.... 4 / 1563, loss : 0.6981514096260071
training.... 5 / 1563, loss : 0.6839051842689514
training.... 6 / 1563, loss : 0.8126610517501831
training.... 7 / 1563, loss : 0.6574415564537048
training.... 8 / 1563, loss : 0.6901558041572571
training.... 9 / 1563, loss : 0.7265428900718689
training.... 10 / 1563, loss : 0.7661244869232178
training.... 11 / 1563, loss : 0.7669392228126526
training.... 12 / 1563, loss : 0.6610114574432373
training.... 13 / 1563, loss : 0.7438639998435974
training.... 14 / 1563, loss : 0.7445533871650696
training.... 15 / 1563, loss : 0.7117632627487183
training.... 16 / 1563, loss : 0.7116361856460571
training.... 17 / 1563, loss : 0.7045701742172241
training.... 18 / 1563, loss : 0.68053185