In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import cv2 

from tqdm import tqdm

In [3]:
import torch
from torch import nn,optim
from torchvision import transforms
from torch.utils.data import DataLoader, random_split

In [4]:
import torch
import torch.nn as nn 
from torch import Tensor

In [5]:
class DoubleConv(nn.Module):
    
    """
    A double convolutional Layer with 3x3, ReLU, and optional MaxPooling
    
    includes a foward method to pass argument forward
    
    Attributes:
    
        in_channels (int): The dimensin of the img 
        out_channels (int): The number of classification
        use_pooling (bool): Whether or not to use pooling
    """
    
    def __init__(self, in_channels:int, out_channels:int, use_pooling:bool=True):
        super().__init__()
        self.conv_op=nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1), 
            nn.ReLU(),
        )
        if use_pooling:
            self.pool = nn.MaxPool2d(kernel_size=2,stride=2)
            
    def forward(self,x:Tensor)->Tensor:

        conv = self.conv_op(x)
        pooling = self.pool(conv)
        
        return pooling

class VGG13(nn.Module):
    
    def __init__(self, in_channels:int,num_channels:int):
        super().__init__()

        # 10 layers of Convolution
        self.block1 = DoubleConv(in_channels,64)
        self.block2 = DoubleConv(64,128)
        self.block3 = DoubleConv(128,256)
        self.block4 = DoubleConv(256,512)
        self.block5 = DoubleConv(512,512)

        # Fully connected layers
        self.fc=nn.Sequential(
            nn.Linear(512*7*7,128),
            nn.ReLU(),
            nn.Linear(128,num_channels)
        ) 

    def extract_features(self,x)->Tensor:
        
        down1 = self.block1(x)
        down2 = self.block2(down1)
        down3 = self.block3(down2)
        down4 = self.block4(down3)
        down5 = self.block5(down4)
        
        flatten_features = down5.flatten(start_dim=1)

        # Returns a tensor (7*512*512)
        return flatten_features
        
    def forward(self,x):
        
        flatten_x:torch.tensor = self.extract_features(x)
        output = self.fc(flatten_x)
        
        return output

In [9]:
class FaceDataset():
    
    def __init__(self, root_path:str, transform=None):
        self.root_path=root_path
        
        self.transform = transform

        # create a dictionary labeling female_faces,male_faces,object to a number
        self.img_labels:dict[str:int]={dir_name:i for i,dir_name in enumerate(sorted(os.listdir(root_path)))}

        self.samples:list[tuple(str,int)] = []

        # Retrieving each image and saving it as a tuple to samples
        for folder_name in os.listdir(self.root_path):
            folder_path:str = os.path.join(self.root_path,folder_name)         
            for img in os.listdir(folder_path):
                try:
                    self.samples.append(( os.path.join(folder_path,img), self.img_labels[folder_name]))
                except Exception as e:
                    print(f'{e}')

    def __len__(self):
        return len(self.samples)
        
    def __getitem__(self,index):
        
        img_path, label = self.samples[index]
        img = cv2.imread(img_path)

        if self.transform:
            img = self.transform(img)

        return (img, label)
        

In [10]:
def get_mean_std(loader):
    # VAR[X] = E[X**2] - E[X]**2
    channels_sum, channels_squared_sum, num_batches=0,0,0
    
    for data,_ in loader:
        channels_sum = torch.mean(data,dim=[0,2,3])
        channels_squared_sum = torch.mean(data**2,dim=[0,2,3])
        num_batches+=1

    mean = channels_sum/num_batches
    std = (channels_squared_sum/num_batches - mean**2)**0.5

    return mean, std

In [11]:
# Calculate the Mean & Std for normalizing
ROOT_PATH='../data/raw/Images'

dataset = FaceDataset(ROOT_PATH)

mean, std = get_mean_std(DataLoader(dataset,shuffle=True))

print('Mean             :',mean)
print('Standard Deviatin:',std)

RuntimeError: mean(): could not infer output dtype. Input dtype must be either a floating point or complex dtype. Got: Byte

In [None]:
# Training Variable

ROOT_PATH='../data/raw/Images'
IN_CHANNELS=3
OUT_CHANNELS=3
BATCH_SIZE = 8
LEARNING_RATE=0.1
EPOCHS=100

# Check is nvidia GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224),antialias=False),
    transforms.Normalize(mean=(9.1111e-05, 1.0778e-04, 1.2251e-04),std=(0.0073, 0.0084, 0.0093))
])

# Initializing Datasets
dataset = FaceDataset(ROOT_PATH,transform)
generator = torch.Generator().manual_seed(42)

train_data,attack_data, test_data = random_split(dataset,[0.7,0.1,0.2],generator=generator)
print('Total number of dataset:',len(dataset))
print('Train                  :',len(train_data))
print('Attack                 :', len(attack_data))
print('Test                   :', len(test_data))


train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

# Setting up 
model = VGG13(IN_CHANNELS,OUT_CHANNELS).to(device)

optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

In [None]:
data, targets = next(iter(train_loader))

for epoch in range(EPOCHS):
    
    model.train()
    correct=0
    total=0
    
    # for idx, (data, targets) in enumerate(train_loader):
        
    data = data.to(device)
    targets = targets.to(device)
    # Forward
    optimizer.zero_grad()
    prediction = model(data)
    
    loss = criterion(prediction,targets)
    
    # Backward
    
    loss.backward()
    # for name, param in model.named_parameters():
    #     print(name, param.grad.norm())
    # print(loss)
    # gradient descent
    before = model.fc[0].weight.clone().detach()
    optimizer.step()
    after = model.fc[0].weight.clone().detach()
    # print((before - after).abs().sum())
    
    # Calculate numer of correct classification
    correct+=(prediction.argmax(dim=1) == targets).sum().item()
    total+=targets.size(0)

    print(f'Epoch [{epoch+1}/{EPOCHS}] | {correct}/{total} | Accuracy: {correct/total:.2f}')
    

In [None]:
data, targets = next(iter(train_loader))

channels_sum, channels_squared_sum, num_batches=0,0,0
    

In [None]:
## Testing the input image

# i=4
# img=data[i].cpu()
# label = targets[i]

# print(label)
# plt.imshow(img.per)
# plt.show()

In [None]:
## Testing the input image

# images, labels = next(iter(train_loader))
# print(images.min(), images.max(), images.shape)
# print(labels)