In [None]:
import torch
import torch.nn as nn
import torchvision
import einops
import torchvision
import torchvision.transforms as transforms
import cv2
import numpy as np
from torchvision import datasets
from torch.utils.data.sampler import SubsetRandomSampler
import os
from pathlib import Path
import torch
from PIL import Image
from tqdm import tqdm
from torchmetrics import F1Score

In [None]:
class Project(nn.Module):
    def __init__(self,N:int,in_channels,embed_size,batch_size):
        super().__init__()
        self.N = N #patch size
        self.batch_size = batch_size 
        self.embed_size = embed_size # embed size is the size of linearly projected patch of image
        self.in_channels = in_channels # channel size of image, 1 for grayscale, 3 for colored image
        self.linear1 = nn.Linear(self.in_channels*self.N**2,self.embed_size) # Linear projection layer
        self.cls = nn.Parameter(torch.randn(self.batch_size,1,embed_size))
        self.position_em = nn.Parameter(torch.randn(self.N**2+1,self.embed_size))
    def forward(self,x:torch.Tensor):
        out = einops.rearrange(x,"b c (h px) (w py) ->b (h w) (c px py)",px =self.N,py =self.N)
        out = self.linear1(out)
        
        out = torch.cat([out,self.cls],dim =1)
        out = out+self.position_em
        return out

In [None]:
class DotProductAttention(nn.Module):
    def __init__(self,embed_size):
        super().__init__()
        self.embed_size = embed_size
        self.query = nn.Linear(self.embed_size,self.embed_size)
        self.key = nn.Linear(self.embed_size, self.embed_size)
        self.value = nn.Linear(self.embed_size,self.embed_size)
        self.softmax = nn.Softmax(dim =1)
    def forward(self,embed: torch.Tensor):
        query = self.query(embed)
        key  = einops.rearrange(self.key(embed),"b n e ->b e n")
        value = self.value(embed)
        sdp = torch.matmul(self.softmax(torch.matmul(query,key)/key.size(dim = 2)**(1/2)),value)
        return sdp

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self,embed_size,num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.embed_size = embed_size
        self.embed_part = int(self.embed_size/self.num_heads)
        self.DotProductAttention = DotProductAttention(self.embed_part)
        
    def forward(self,embed):
        
        splitted_embed = torch.tensor_split(embed,self.num_heads,dim = 2)
        spds = [self.DotProductAttention(i) for i in splitted_embed]
        return torch.concat(spds,dim = 2)
        


In [None]:
class EncoderBlock(nn.Module):
    def __init__(self,embed_size,num_heads,N,in_channels,batch_size):
        super().__init__()
        self.num_heads = num_heads
        self.embed_size = embed_size
        self.N = N
        self.in_channels = in_channels
        self.batch_size = batch_size
        self.mha = MultiHeadAttention(embed_size,num_heads)
        self.Linear = nn.Linear(embed_size,embed_size)
        self.layernorm = nn.LayerNorm(self.embed_size)
        self.proj = Project(self.N,self.in_channels,self.embed_size,self.batch_size)
    def forward(self,embed):
        out = self.mha(embed)
        out = out + embed
        out = self.layernorm(out)
        out = self.Linear(out)
        return out

In [None]:
class Transformer(nn.Module):
    def __init__(self,embed_size,num_heads,N,in_channels,batch_size,num_encoders,num_class,device):
        super().__init__()
        self.device = device
        self.num_heads = num_heads
        self.embed_size = embed_size
        self.N = N
        self.in_channels = in_channels
        self.batch_size = batch_size
        self.num_encoders = num_encoders
        self.num_class = num_class
        self.proj = Project(self.N,self.in_channels,self.embed_size,self.batch_size)

        self.tiny_block = [nn.Sequential(nn.LayerNorm(self.embed_size),EncoderBlock(self.embed_size,self.num_heads,self.N,self.in_channels,self.batch_size)) for i in range(num_encoders)]
        self.block_seq = nn.Sequential(*self.tiny_block)
        
        self.Linear = nn.Linear(self.embed_size,self.num_class)
        
        self.layernorm = nn.LayerNorm(num_class)

        indices = [i for i in range(self.num_class)]
        self.indices = torch.tensor(indices)
    
    def num_of_parameters(self,):

        return sum(p.numel() for p in self.parameters())
    
    def forward(self,img):
        out = self.proj(img)
        out = self.block_seq(out)
        out = self.Linear(torch.squeeze(torch.index_select(out,1,torch.tensor(self.N**2).to(self.device))))
        out = self.layernorm(out)
        return out

In [None]:
class Custom_Dataset():

    def __init__(self, directory, mode="train"):
        self.path = Path(directory)
        Path.ls = lambda x: list(x.iterdir())
        try:
            files = os.listdir(directory)
            print(files)
        except:
            print("wrong path")
        self.x = [torch.tensor(np.transpose(np.array(Image.open(img).resize((144,144))), (2, 0, 1))).type(
            torch.FloatTensor) for img in (self.path/files[0]).ls()]
        self.x = torch.stack(self.x)/255
        self.y = torch.tensor([0]*len((self.path/files[0]).ls()))
        for i in range(len(files)-1):
            self.x2 = [torch.tensor(np.transpose(np.array(Image.open(img).resize((144,144))), (2, 0, 1))).type(
                torch.FloatTensor) for img in (self.path/files[i+1]).ls()]
            self.x2 = torch.stack(self.x2)/255
            self.x = torch.cat((self.x, self.x2), 0)
            self.y = torch.cat((self.y, torch.tensor(
                [i+1]*len((self.path/files[i+1]).ls()))))
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, i):
        return self.x[i], self.y[i]
    

In [None]:
dataset = Custom_Dataset("../input/good-guysbad-guys-image-data-set/train") # https://www.kaggle.com/datasets/gpiosenka/good-guysbad-guys-image-data-set

In [None]:
def train_val_loader(dataset,batch_size,valid_per,mode):
    if mode == "train":
        indices = torch.randperm(len(dataset))
        split = int(np.floor((valid_per)*(len(dataset))))
        t_idx, v_idx = indices[:split], indices[split:]
        train_sampler = SubsetRandomSampler(t_idx)
        val_sampler = SubsetRandomSampler(v_idx)
        trainloader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, num_workers=2, drop_last=True, sampler=train_sampler)
        validloader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, num_workers=2, drop_last=True, sampler=val_sampler)
        return trainloader, validloader
    elif mode == "test":
        testloader = torch.utils.data.DataLoader(
                dataset, batch_size=abatch_size, num_workers=2, drop_last=True)
        return testloader

    else:
        print("Invalid mode")

In [None]:
train_loader, val_loader = train_val_loader(dataset,batch_size,valid_per,"train")

In [None]:
f1 = F1Score(num_classes=2).to(device)

In [None]:

def train(trainloader, validloader, model, optimizer, criterion,epochs,f1,batch_size):
    device = "cuda"
    valid_loss_min = np.Inf
    for i in range(epochs):
        print("Epoch - {} Started".format(i+1))

        train_loss = 0.0
        valid_loss = 0.0
        train_score = 0.0
        val_score = 0.0
        model.train()
        for data, target in tqdm(trainloader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()*data.size(0)
            train_score = train_score +f1(output,target)
        model.eval()
        for data, target in validloader:
            data, target = data.to(device), target.to(device)
            with torch.no_grad():
                output = model(data)
            loss = criterion(output, target)
            valid_loss += loss.item()*data.size(0)
            val_score = val_score + f1(output,target)
        train_loss = train_loss/len(trainloader.sampler)
        valid_loss = valid_loss/len(validloader.sampler)
        train_score = batch_size*train_score/len(trainloader.sampler)
        val_score = batch_size*val_score/len(validloader.sampler)
        print(f"F1 Score for train: {train_score}, F1 Score for validation: {val_score} ")
        print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
            i+1, train_loss, valid_loss))

        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
                valid_loss_min,
                valid_loss))
            
            torch.save(model.state_dict(), 'model.pt')
            valid_loss_min = valid_loss

In [None]:
model = Transformer(embed_size = 512,num_heads = 8,N = 12,in_channels =3,batch_size = 8,num_encoders = 4,num_class = 2,device = "cuda").to(device)

In [None]:
criterion = torch.nn.CrossEntropyLoss()

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr = learning_rate,betas = (beta1,beta2),weight_decay=weight_decay)

In [None]:
train(train_loader,val_loader,model,optimizer,criterion,100,f1,batch_size)