In [None]:
_exp_name =  "template"

### import packages

In [None]:
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image

# "ConcatDataset" and "Subset" are possibly useful when doing semi-supervised learning.
from torch.utils.data import ConcatDataset, DataLoader, Subset, Dataset
from torchvision.datasets import DatasetFolder, VisionDataset

# This is for the progress bar.
from tqdm.auto import tqdm
import random

# ploting
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

# models 
import torchvision.models as models

### Set Random seed

In [None]:
myseed = 66666
random.seed(myseed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(myseed)
torch.manual_seed(myseed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(myseed)

### Hyper-parameters

In [None]:
batch_size = 64

_dataset_dir = "./data" # directory to get the data

# training loop
early_stop_steps = 300
num_epochs = 10

# optimizer
learning_rate = 1e-3
weigth_decay = 1e-6

# transforms for data augmentation
image_size = 192

# transform for testing set
test_tfm = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
])

# transform for training set
# more trasforms: https://pytorch.org/vision/stable/transforms.html

train_tfm = transforms.Compose([
    transforms.Resize((image_size, image_size)),   
    transforms.RandomApply([   
        transforms.RandomApply([transforms.CenterCrop((160,160))], p=0.4),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomApply([transforms.RandomRotation(10, expand=False, center=None, fill=256)], p=0.35),
        transforms.RandomApply([transforms.Pad(10, fill=256, padding_mode='constant')], p=0.3),
        transforms.RandomApply([transforms.ColorJitter(brightness=0.2, contrast=0, saturation=0, hue=0)], p=1),
        transforms.RandomApply([transforms.ColorJitter(brightness=0, contrast=0.4, saturation=0, hue=0)], p=0.3),
        transforms.RandomApply([transforms.ColorJitter(brightness=0, contrast=0, saturation=0.4, hue=0)], p=0.3),
        ],
    p=0.98),
    transforms.Resize((image_size, image_size)),  
    transforms.ToTensor(),
])

### Dataset
class for data loader in pytorch

intit-> load the datas to a list

getitem-> return an image and its label every time its called

In [None]:
class FaceDataset(Dataset):

    def __init__(self,path,tfm=test_tfm,files = None):
        super(FaceDataset).__init__()
        self.path = path

        ### ===== get the files ==== ###
        self.files = sorted([os.path.join(path,x) for x in os.listdir(path) if x.endswith(".jpg")])
        if files != None:
            self.files = files
        print(f"One {path} sample",self.files[0])
        ### ==== ====###
        
        self.transform = tfm
  
    def __len__(self):
        return len(self.files)
  
    def __getitem__(self,idx):
        ### ==== return an image and its label ==== #
        fname = self.files[idx]
        im = Image.open(fname)

        # data augmentation
        im = self.transform(im)
        
        # get the label for training
        try:
            label = int(fname.split("\\")[-1].split("_")[0])
        except:
            label = -1 # test has no label
        #print(fname)
        #print(fname.split("\\")[-1].split("_")[0])
        return im,label

### CNN Model
nn.Conv2d-> CNN layer

nn.BatchNorm2d -> batch normalization

nn.ReLU -> activation function

nn.MaxPool2d -> max pooling

nn.Linear -> linerar layer

forward() -> input an image and get the output from the model

In [None]:
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        # input dimention [3, 128, 128]
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),  # [64, 128, 128] 192
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [64, 64, 64] 96

            nn.Conv2d(64, 128, 3, 1, 1), # [128, 64, 64]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [128, 32, 32] 48

            nn.Conv2d(128, 256, 3, 1, 1), # [256, 32, 32]
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [256, 16, 16] 24

            nn.Conv2d(256, 512, 3, 1, 1), # [512, 16, 16]
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 8, 8]
            
            nn.Conv2d(512, 512, 3, 1, 1), # [512, 8, 8] 12
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 4, 4] 6 (img_size / (2 ** 5))
        )
        self.fc = nn.Sequential(
            nn.Linear(512*6*6, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 2)
        )

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        return self.fc(out)

### Get device

In [None]:
# "cuda" only when GPUs are available.
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"using {device}")

### Load Data

In [None]:
# Construct datasets.
# The argument "loader" tells how torchvision reads the data.
train_set = FaceDataset(os.path.join(_dataset_dir,"training"), tfm=train_tfm)
valid_set = FaceDataset(os.path.join(_dataset_dir,"validation"), tfm=test_tfm)

random.shuffle(valid_set.files)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

### Training Loop

In [None]:
# Initialize trackers
best_acc = 0
best_loss = 0.0
early_stop_cnt = 0
train_losses = []
val_losses = []
train_acc_record = []
val_acc_record = []


# Initialize a model, and put it on the device specified.
model = Classifier().to(device)

# Use pytorch models 
# More pre-trained models: https://pytorch.org/vision/stable/models.html
# model = models.vgg19_bn(pretrained=False, progress=False).to(device)
# model = models.efficientnet_b0(pretrained=False, progress=False).to(device)

# cross-entropy
criterion = nn.CrossEntropyLoss()

# Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weigth_decay)

# ================================ #
# Haven't implememt early stop yet #
# ================================ #

for epoch in range(num_epochs):

        print(f"\n--> starting epoch {epoch + 1}")

        ### ==== Training ==== ###
        # set model to train mode
        model.train()

        # record information in training.
        train_loss = []
        train_accs = []

        for batch in tqdm(train_loader):

            # A batch consists of image data and corresponding labels.
            imgs, labels = batch

            # Put images through the model
            outputs = model(imgs.to(device))

            # Calculate the cross-entropy loss.
            loss = criterion(outputs, labels.to(device))

            # Gradients stored in the parameters in the previous step should be cleared out first.
            optimizer.zero_grad()

            # Compute the gradients for parameters.
            loss.backward()

            # Clip the gradient norms for stable training.
            grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)

            # Update the parameters with computed gradients.
            optimizer.step()

            # Compute the accuracy for current batch.
            acc = (outputs.argmax(dim=-1) == labels.to(device)).float().mean()

            # Record the loss and accuracy.
            train_loss.append(loss.item())
            train_accs.append(acc)
                
        train_loss = sum(train_loss) / len(train_loss)
        train_acc = sum(train_accs) / len(train_accs)


        # Print the information.
        train_losses.append(train_loss)
        train_acc_record.append(train_acc)
        print(f"[ Train | [{epoch + 1:04d}/{num_epochs:04d}] ] loss = {train_loss:.5f}, acc = {train_acc:.5f}")

        ### ==== Validation ==== ###
        # model eval mode
        model.eval()

        # record information in validation.
        valid_loss = []
        valid_accs = []

        for batch in tqdm(valid_loader):

            imgs, labels = batch


            # torch.no_grad() accelerates the forward process.
            with torch.no_grad():
                outputs = model(imgs.to(device))

            # compute loss
            loss = criterion(outputs, labels.to(device))

            # Compute the accuracy
            acc = (outputs.argmax(dim=-1) == labels.to(device)).float().mean()

            # Record loss and accuracy.
            valid_loss.append(loss.item())
            valid_accs.append(acc)

        # The average loss and accuracy for entire validation set is the average of the recorded values.
        valid_loss = sum(valid_loss) / len(valid_loss)
        valid_acc = sum(valid_accs) / len(valid_accs)


        # Print the information.
        val_losses.append(valid_loss)
        val_acc_record.append(valid_acc)
        print(f"[ Valid | [{epoch + 1:04d}/{num_epochs:04d}] ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")


        # we can use txt to record training process
        """
        # update logs
        if valid_acc > best_acc:
            with open(f"./{_exp_name}_log.txt","a") as log:
                log.writelines(f"[ Valid | [{epoch + 1:04d}/{num_epochs:04d}] ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> best\n")
        else:
            with open(f"./{_exp_name}_log.txt","a") as log:
                log.writelines(f"[ Valid | [{epoch + 1:04d}/{num_epochs:04d}] ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}\n")
        """
        # save models
        if valid_acc > best_acc:
            print(f"==== Best model found at epoch {epoch+1}, saving model ====")
            torch.save(model.state_dict(), f"{_exp_name}_best.ckpt") # only save best to prevent output memory exceed error
            best_acc = valid_acc
            best_loss = valid_loss
            last_improved_cnt = 0
            # with open('best.txt', 'w') as best:
            #     best.write(f"epoch = {epoch + 1}\n")
            #     best.write(f"train acc = {train_acc: .4f}\n")
            #     best.write(f"train loss = {train_loss: .4f}\n")
            #     best.write(f"best acc = {best_acc: .4f}\n")
            #     best.write(f"best loss = {best_loss: .4f}\n")
        else:
            early_stop_cnt += 1 
        if last_improved_cnt >= early_stop_steps:
            # early stop
            print(f'early stop at epoch {epoch}')
            break
            
print(f'\n==> best acc: {best_acc: .4f} best loss : {best_loss: .4f}\n')
# plot the loss
plt.figure(figsize=(4, 2))
plt.plot(np.squeeze(train_losses), label = "train")
plt.plot(np.squeeze(val_losses), label = "val")
plt.ylabel('loss')
plt.xlabel('iterations (per hundreds)')
plt.title("loss")
plt.show()

# plot acc
plt.figure(figsize=(4, 2))
plt.plot(np.squeeze(train_acc_record), label = "train")
plt.plot(np.squeeze(val_acc_record), label = "val")
plt.ylabel('loss')
plt.xlabel('iterations (per hundreds)')
plt.title("Accuracy")
plt.show()

### Make Predictions

In [None]:
test_set = FaceDataset(os.path.join(_dataset_dir,"test"), tfm=test_tfm)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

model_best = Classifier().to(device)
# model_best = models.vgg19_bn(pretrained=False, progress=False).to(device)

model_best.load_state_dict(torch.load(f"{_exp_name}_best.ckpt"))
model_best.eval()
prediction = []
with torch.no_grad():
    for data,_ in test_loader:
        test_pred = model_best(data.to(device))
        test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
        prediction += test_label.squeeze().tolist()

# verify corectness, save prdictions
# def pad4(i):
#     return "0"*(4-len(str(i)))+str(i)
# df = pd.DataFrame()
# df["Id"] = [pad4(i) for i in range(1,len(test_set)+1)]
# df["Category"] = prediction
# df.to_csv("submission.csv",index = False)