<a href="https://colab.research.google.com/github/Gedeon-m-gedus/Image_Processing/blob/master/CV_22%20Faces%20Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torchvision   
import torch.nn as nn
import torch.nn.functional as F
import natsort
from torch.utils.data import Dataset, DataLoader
import time

In [None]:
!unzip "/content/drive/MyDrive/idl-fall21-hw2p2s1-face-classification.zip"

In [None]:
from torchvision import transforms
train_dataset = torchvision.datasets.ImageFolder(root='/content/train_data', 
                                                 transform=transforms.ToTensor())
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=256, 
                                               shuffle=True, num_workers=6,drop_last=True)


In [None]:

val_dataset = torchvision.datasets.ImageFolder(root='/content/val_data', 
                                               transform=transforms.ToTensor())

val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=256, 
                                             shuffle=True, num_workers=6)

In [None]:
testDataLoaderArguments = dict(shuffle = False, batch_size = 1, num_workers = 6)
testDataLoader = data.DataLoader(dataset= os.listdir('/content/test_data/'), **testDataLoaderArguments)


# Models

In [None]:
import torch
import torch.nn as nn


class Block(nn.Module):
    def __init__(self, num_layers, in_channels, out_channels, identity_downsample=None, stride=1):
        assert num_layers in [18, 34, 50, 101, 152], "should be a a valid architecture"
        super(Block, self).__init__()
        self.num_layers = num_layers
        if self.num_layers > 34:
            self.expansion = 4
        else:
            self.expansion = 1
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        if self.num_layers > 34:
            self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        else:
            self.conv2 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x
        if self.num_layers > 34:
            x = self.conv1(x)
            x = self.bn1(x)
            x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, num_layers, block, image_channels, num_classes):
        assert num_layers in [18, 34, 50, 101, 152], f'ResNet{num_layers}: Unknown architecture! Number of layers has ' \
                                                     f'to be 18, 34, 50, 101, or 152 '
        super(ResNet, self).__init__()
        if num_layers < 50:
            self.expansion = 1
        else:
            self.expansion = 4
        if num_layers == 18:
            layers = [2, 2, 2, 2]
        elif num_layers == 34 or num_layers == 50:
            layers = [3, 4, 6, 3]
        elif num_layers == 101:
            layers = [3, 4, 23, 3]
        else:
            layers = [3, 8, 36, 3]
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNetLayers
        self.layer1 = self.make_layers(num_layers, block, layers[0], intermediate_channels=64, stride=1)
        self.layer2 = self.make_layers(num_layers, block, layers[1], intermediate_channels=128, stride=2)
        self.layer3 = self.make_layers(num_layers, block, layers[2], intermediate_channels=256, stride=2)
        self.layer4 = self.make_layers(num_layers, block, layers[3], intermediate_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * self.expansion, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x

    def make_layers(self, num_layers, block, num_residual_blocks, intermediate_channels, stride):
        layers = []

        identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels, intermediate_channels*self.expansion, kernel_size=1, stride=stride),
                                            nn.BatchNorm2d(intermediate_channels*self.expansion))
        layers.append(block(num_layers, self.in_channels, intermediate_channels, identity_downsample, stride))
        self.in_channels = intermediate_channels * self.expansion # 256
        for i in range(num_residual_blocks - 1):
            layers.append(block(num_layers, self.in_channels, intermediate_channels)) # 256 -> 64, 64*4 (256) again
        return nn.Sequential(*layers)
    

def createModel(name='R18'):
    img_channels = 3
    num_classes = 4000
    if name == 'R18': return ResNet(18, Block, img_channels, num_classes)
    elif name == 'R34': return ResNet(34, Block, img_channels, num_classes)
    elif name == 'R50': return ResNet(50, Block, img_channels, num_classes)
    elif name == 'R101': return ResNet(101, Block, img_channels, num_classes)
    elif name == 'R152': return ResNet(152, Block, img_channels, num_classes)
    else: return None
    
def testModel():
    models = ['R18', 'R34', 'R50', 'R101', 'R154']
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    for model_name in models:
        print('Testing', model_name)
        model = createModel(name='R18')
        y = model(torch.randn(4, 3, 64, 64)).to(device)
        print(y.size())

testModel()

In [None]:
model = createModel(name='R18')

# Optimzers and training loop

In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

n_epochs = 3
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
for epoch in range(n_epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(train_dataloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        # zero the parameter gradients
        optimizer.zero_grad()
        #print(inputs.shape)

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 10 == 0:
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 10))
            running_loss = 0.0

print('Finished Training')
PATH = './model_chckpoint.pth'
torch.save(model.state_dict(), PATH)

# Testing and Inference

In [None]:
import pandas as pd
def inference(model, dataloader_test):    
    model.eval()
    results = []
    transform = transforms.ToTensor()
    print('Inferencing ...')
    for ims in dataloader_test:
        img = Image.open('/content/test_data/'+ims[0])
        input = transform(img)
        input = input.view(-1, 3,64,64).to(device)
        model.eval()
        output = model(input)
        _, preds = output.topk(1, 1, True, True)
        preds = preds.cpu().detach().numpy()
        results.append({'id':ims[0], 'label':preds[0][0]})
    df = pd.DataFrame(results, columns=['id', 'label'])
    df.to_csv('sub.csv', index=False)

In [None]:
inference(model, testDataLoader)

In [None]:
pd.read_csv('sub.csv')

In [None]:
model.load_state_dict(best_model_wts)

In [None]:
# mdl = createModel(name='R18')
# mdl.load_state_dict('model_chckpoint.pth')