In [2]:
import cv2
import random
import numpy as np 
import csv



import torch
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split
from torchvision import datasets, transforms


seed_value = 37
np.random.seed(seed_value)
torch.manual_seed(seed_value)

LEARNING_RATE = 0.0001
NUM_CLASSES = 50
PATCH_SIZE = 16
IMG_SIZE = 64
IN_CHANNEL = 3
EPOCHS = 100
NUM_HEADS = 8
DROPOUT = 0.01
HIDDEN_DIM = 1024
ADAM_WEIGHT_DECAY = 0
ADAM_BETAS = (0.9, 0.999)
ACTIVATION = "gelu"
NUM_ENCODERS = 6
EMBED_DIM = (PATCH_SIZE **2) * IN_CHANNEL
NUM_PATCHES = ( IMG_SIZE // PATCH_SIZE )**2

device = "cuda"

In [3]:
class AddGaussianNoise(object):
    def __init__(self, mean=0, std=1):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        # Adding Gaussian noise
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
   
    
transform1 = transforms.Compose([
    transforms.Resize((64, 64)), 
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    AddGaussianNoise(mean=0, std=0.1),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    
])

transform2 = transforms.Compose([
    transforms.Resize((64, 64)),  
    transforms.ToTensor(),          
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  
])

root_dir = "/kaggle/input/iith-dl-contest-2024/train/train"
prediction_root_dir = "/kaggle/input/iith-dl-contest-2024/test"

dataset = datasets.ImageFolder(root=root_dir, transform=transform1)

val_size = int(0.1 * len(dataset))
train_size = len(dataset) - val_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
prediction_dataset = datasets.ImageFolder(root=prediction_root_dir, transform=transform2)



In [8]:
class PatchImg(nn.Module):
    def __init__(self, embed_dim, patch_size, num_patches, dropout, in_channels):
        super().__init__()
        self.patcher = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=embed_dim,
                kernel_size=patch_size,
                stride=patch_size
            ),
            nn.Flatten(2))
        self.cls_token = nn.Parameter(torch.rand(size=(1, in_channels, embed_dim)), requires_grad=True)
        self.position_embedding = nn.Parameter(torch.rand(size=(1, num_patches + 3, embed_dim)), requires_grad=True)
        self.dropout = nn.Dropout(p=dropout)
        self.num_patches = num_patches

    def forward(self, x, masking):
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)

        x = self.patcher(x).permute(0, 2, 1)
        x = torch.cat([cls_token, x], dim=1)

        if masking:
            # Generate random mask during training
            mask = torch.ones_like(x)
            mask[:, :self.num_patches, :] = torch.randint(0, 2, size=(x.shape[0], self.num_patches, x.shape[-1]))
            
            # Fill removed patches with random noise
            noise = torch.randn_like(x[:, :self.num_patches, :])
            x[:, :self.num_patches, :] = x[:, :self.num_patches, :] * mask[:, :self.num_patches, :] + noise * (1 - mask[:, :self.num_patches, :])
        else:
            # During evaluation/testing, no masking is applied
            mask = torch.ones_like(x)

        x = self.position_embedding + x
        x = self.dropout(x)
        return x


class PatchImg(nn.Module):
    def __init__(self, embed_dim, patch_size, num_patches, dropout, in_channels):
        super().__init__()
        self.patcher = nn.Sequential(
            nn.Conv2d(
                in_channels = in_channels,
                out_channels = embed_dim,
                kernel_size=patch_size,
                stride=patch_size
            ),
            nn.Flatten(2))
        self.cls_token = nn.Parameter(torch.rand(size=(1, in_channels, embed_dim)), requires_grad=True)
        self.position_embedding = nn.Parameter(torch.rand(size=(1, num_patches+3, embed_dim)), requires_grad=True)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)

        x=self.patcher(x).permute(0,2,1)
        x=torch.cat([cls_token, x], dim=1)
        #print("Position embedding size:", self.position_embedding.size())
        #print("x size before addition:", x.size())
        x=self.position_embedding + x
        x=self.dropout(x)
        return x
        

"""model = PatchImg(EMBED_DIM, PATCH_SIZE, NUM_PATCHES, DROPOUT, IN_CHANNEL).to(device)
x=torch.randn(32, 1, 64, 64).to(device)
print(model(x).shape)"""

In [5]:
class ViT(nn.Module):
    def __init__(self, num_patches, img_size, num_classes, patch_size, embed_dim, num_encoders, num_heads, hidden_dim,  dropout, activation, in_channels):
        super().__init__()
        self.embeddings_block = PatchImg(embed_dim, patch_size, num_patches, dropout, in_channels)

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dropout=dropout, activation=activation, batch_first=True, norm_first=True)
        self.encoder_blocks = nn.TransformerEncoder(encoder_layer, num_layers=num_encoders)

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(normalized_shape=embed_dim),
            nn.Linear(in_features=embed_dim, out_features=num_classes)
        )

    def forward(self, x, masking):
        x = self.embeddings_block(x, masking)
        x = self.encoder_blocks(x)
        x = F.softmax(self.mlp_head(x[:, 0, :]))
        return x

class ViT(nn.Module):
    def __init__(self, num_patches, img_size, num_classes, patch_size, embed_dim, num_encoders, num_heads, hidden_dim,  dropout, activation, in_channels):
        super().__init__()
        self.embeddings_block = PatchImg(embed_dim, patch_size, num_patches, dropout, in_channels)

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dropout=dropout, activation=activation, batch_first=True, norm_first=True)
        self.encoder_blocks = nn.TransformerEncoder(encoder_layer, num_layers=num_encoders)

        self.mlp_head = nn.Sequential(
            #nn.LayerNorm(normalized_shape=embed_dim),
            nn.Linear( 14592, 4096),
            nn.ReLU(),
            nn.BatchNorm1d(4096),
            
            nn.Linear( 4096, 2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048),
            
            nn.Linear(2048, 50),
            nn.Softmax(dim=1)
            
        )

    def forward(self, x):
        x = self.embeddings_block(x)
        x = self.encoder_blocks(x)
        x = x.contiguous().view(50, -1)
        x = self.mlp_head(x)
        return x



model = ViT(NUM_PATCHES, IMG_SIZE, NUM_CLASSES, PATCH_SIZE, EMBED_DIM, NUM_ENCODERS, NUM_HEADS, HIDDEN_DIM, DROPOUT, ACTIVATION, IN_CHANNEL).to(device)

x = torch.randn(50, 3,64, 64).to(device)
print(model(x).shape)


device = "cuda"

print(sum(p.numel() for p in model.parameters() if p.requires_grad))


In [6]:
train_dataloader = DataLoader(train_dataset, batch_size=50, shuffle=True, num_workers=2)
val_dataloader = DataLoader(val_dataset, batch_size=50, shuffle=False, num_workers=2)
predict_dataloader = DataLoader(prediction_dataset, batch_size=50, shuffle=False, num_workers=2)

for step, (inputs, labels) in enumerate(val_dataloader):
    if(inputs.shape[1]!= 3 or inputs.shape[2]!=64 or inputs.shape[3]!=64):
        print ("hello")
print("allOK")



    

model = ViT(NUM_PATCHES, IMG_SIZE, NUM_CLASSES, PATCH_SIZE, EMBED_DIM, NUM_ENCODERS, NUM_HEADS, HIDDEN_DIM, DROPOUT, ACTIVATION, IN_CHANNEL).to(device)
#model = torch.nn.DataParallel(model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), betas=ADAM_BETAS, lr=LEARNING_RATE, weight_decay=ADAM_WEIGHT_DECAY)



for epoch in range(1):

    model.eval()
    val_labels = []
    val_preds = []
    val_running_loss = 0
    with torch.no_grad():
        for step, (inputs, labels) in enumerate(val_dataloader):
            x = inputs.float().to(device) 
            y = labels.type(torch.uint8).to(device)

            y_preds = model(x)
            y_pred_labels = torch.argmax(y_preds, dim=1)

            val_labels.extend(labels.cpu().detach())
            val_preds.extend(y_pred_labels.cpu().detach())

            loss = criterion(y_preds, y)

            val_running_loss += loss.item()
           

    val_loss = val_running_loss / (step+1)

print("AllOK")

In [9]:
device = "cuda"
#           num_patches, img_size, num_classes, patch_size, embed_dim, num_encoders, num_heads, hidden_dim,  dropout, activation, in_channels
model = ViT(NUM_PATCHES, IMG_SIZE, NUM_CLASSES, PATCH_SIZE, EMBED_DIM, NUM_ENCODERS, NUM_HEADS, HIDDEN_DIM, DROPOUT, ACTIVATION, IN_CHANNEL).to(device)
#model = torch.nn.DataParallel(model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), betas=ADAM_BETAS, lr=LEARNING_RATE, weight_decay=ADAM_WEIGHT_DECAY)

loss_log = [["training loss", "validation loss", "training accuracy", "validation accuracy"]]

for epoch in range(EPOCHS):
    model.train()
    train_labels = []
    train_preds = []
    train_running_loss = 0
    for step, (inputs, labels) in enumerate(train_dataloader):
        x = inputs.float().to(device)
        y = labels.type(torch.uint8).to(device)
        y_preds = model(x, masking=True)
        y_pred_labels = torch.argmax(y_preds, dim=1)

        train_labels.extend(labels.cpu().detach())
        train_preds.extend(y_pred_labels.cpu().detach())

        loss = criterion(y_preds, y)
        train_running_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        

    train_loss = train_running_loss / (step+1)

    model.eval()
    val_labels = []
    val_preds = []
    val_running_loss = 0
    with torch.no_grad():
        for step, (inputs, labels) in enumerate(val_dataloader):
            x = inputs.float().to(device) 
            y = labels.type(torch.uint8).to(device)

            y_preds = model(x, masking=False)
            y_pred_labels = torch.argmax(y_preds, dim=1)

            val_labels.extend(labels.cpu().detach())
            val_preds.extend(y_pred_labels.cpu().detach())

            loss = criterion(y_preds, y)

            val_running_loss += loss.item()

    val_loss = val_running_loss / (step+1)

    print("-"*40)
    print(f"Train Loss for epoch {epoch+1}: {train_loss:.4f}")
        
    print(f"Validation Loss for epoch {epoch+1}: {val_loss:.4f}")
        
    train_acc = sum(1 for x,y in zip(train_preds, train_labels) if x==y) / len(train_labels)
    print(f"Train Accuracy for epoch {epoch+1}: {train_acc}")
        
    val_acc = sum(1 for x,y in zip(val_preds, val_labels) if x==y )/ len(val_labels)
    print(f"Val Accuracy for epoch {epoch+1}: {val_acc}")
      
    loss_log.append([float(f"{train_loss:.4f}"), float(f"{val_loss:.4f}"), float(f"{train_acc:.4f}"), float(f"{val_acc:.4f}")])
        
    if((epoch+1) % 5 == 0):
        
            
        predictions = []
        model.eval()
        with torch.no_grad():
            for inputs, _ in predict_dataloader:
                x = inputs.float().to(device) 
                y = model(x, masking=False)
                _, predicted = torch.max(y, 1)
                predictions.extend(predicted.cpu().tolist())
        class_names = [dataset.classes[idx] for idx in predictions]
            
            
        data = [["ID", "Category"]]

        for i in range(len(class_names)):
            data.append([f"{i}.JPEG", class_names[i]])
        file_path = f"ViT64_ep{epoch+1}-lr-001.csv"
        with open(file_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerows(data)
            
torch.save(model.state_dict(), f"model_ViT_ep{epoch+1}.pth")
with open("log_loss-vit-lr-001.csv", 'w', newline='') as log:
    writer = csv.writer(log)
    writer.writerows(loss_log)

  x = F.softmax(self.mlp_head(x[:, 0, :]))


----------------------------------------
Train Loss for epoch 1: 3.8776
Validation Loss for epoch 1: 3.8707
Train Accuracy for epoch 1: 0.06557264957264958
Val Accuracy for epoch 1: 0.072
----------------------------------------
Train Loss for epoch 2: 3.8513
Validation Loss for epoch 2: 3.8616
Train Accuracy for epoch 2: 0.09268376068376068
Val Accuracy for epoch 2: 0.07953846153846154
----------------------------------------
Train Loss for epoch 3: 3.8391
Validation Loss for epoch 3: 3.8473
Train Accuracy for epoch 3: 0.10553846153846154
Val Accuracy for epoch 3: 0.09461538461538462
----------------------------------------
Train Loss for epoch 4: 3.8237
Validation Loss for epoch 4: 3.8333
Train Accuracy for epoch 4: 0.12170940170940171
Val Accuracy for epoch 4: 0.10892307692307693
----------------------------------------
Train Loss for epoch 5: 3.8146
Validation Loss for epoch 5: 3.8278
Train Accuracy for epoch 5: 0.13017094017094016
Val Accuracy for epoch 5: 0.11723076923076924
----