In [None]:
!pip install einops
!pip install wandb
!pip install codecarbon
!pip install transformers

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import sys
#list the current working dir
os.getcwd()
#change the current working dir
os.chdir('/content/drive/MyDrive/HPVIT')

In [None]:
import torch
import os
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets.cifar import CIFAR10, CIFAR100
from torch.optim import Adam
import random

from tqdm import tqdm, trange

import wandb
from codecarbon import track_emissions
# VISION TRANSFORMER PROBABILISTICO CON FEATURE SELECTION
# from hvit.heuristic_vision_transformer_block import HViT
# VISION TRANSFORMER CON FEATURE SELECTION
from hvit.heuristic_vision_transformer_block import HViT
# VISION TRANSFORMER 
from hvit.vision_transformer_block import ViT

# Contrastive Learning Loss

In [None]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, embeddings, labels):
        batch_size = embeddings.size(0)
        pairwise_distances = torch.cdist(embeddings, embeddings)
        positive_distances = pairwise_distances[labels == 1]
        negative_distances = pairwise_distances[labels == 0]
        hard_negative_distances, _ = negative_distances.max(dim=1)

        loss = (positive_distances.pow(2).sum() +
                torch.clamp(self.margin - hard_negative_distances.pow(2), min=0).sum()) / batch_size
        return loss

# Setting Device


In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
    map_location=lambda storage, loc: storage.cuda()
    print('Using GPU')
else:
    device = torch.device('cpu')
    map_location='cpu'
    print('GPU is not available, using CPU')

Using GPU


# CIFAR10 Dataset Loader

In [None]:
transform = transforms.Compose([
        # Resize the image to (64, 64)
        transforms.Resize((224, 224)),
        # Convert the image to a PyTorch tensor
        transforms.ToTensor(),
        # Normalize the image with mean and standard deviation of 0.5
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
# Load CIFAR10 dataset
train_set = CIFAR10(root='./data', train=True, download=True, transform=transform)
test_set = CIFAR10(root='./data', train=False, download=True, transform=transform)
# Create data loaders
train_loader = DataLoader(train_set, shuffle=False, batch_size=128)
test_loader = DataLoader(test_set, shuffle=False, batch_size=128)

Files already downloaded and verified
Files already downloaded and verified


# PRETRAINING

In [None]:
!wand login 6d0d4fcf28a32488eb9c49d8fb9198328577975c

/bin/bash: wand: command not found


In [None]:
run = 'heuristic'
training = 'type_1'
output_dim = 128
depths = 2
project_name = f"RQ3"
dataset = "VOCSegmentation"
classes = 10
pre_training = True
contrastive_learning = False
fine_tuning = True

In [None]:
if run != 'default':
  wandb.init(
  # set the wandb project where this run will be logged
  project=project_name,
  name=f'{run}_{training}',
  # track hyperparameters and run metadata
  config={
        "FViT": True,
        "pre-training": pre_training,
        "contrastive learning": contrastive_learning,
        "fine-tuning": fine_tuning,
        "task": "Image Recognition",
        "num_heads":12,
      }
  )
else:
  wandb.init(
      # set the wandb project where this run will be logged
      project=project_name,
      name=f'{run}_{training}',
      # track hyperparameters and run metadata
      config={
        "FViT": False,
        "pre-training": pre_training,
        "contrastive learning": contrastive_learning,
        "fine-tuning": fine_tuning,
        "task": "Image Recognition",
        "num_heads":12,
      }
  )

In [None]:
if run == 'default':
      model = ViT(in_channels=3,
                patch_size=16,
                emb_size=64,
                img_size=224,
                depth=2,
                n_classes=output_dim,
                num_heads=12).to(device)
else:
      model = HViT(in_channels=3,
                 patch_size=16,
                 emb_size=64,
                 img_size=224,
                 depth=2,
                 num_heads=12,
                 n_classes=output_dim,
                 top_k=138,
                 heuristic='variance',
                 probabilistic=False,
                 prob=1,
                 decay_rate=0.0,
                 batch_size=len(train_loader),
                 verbose=False).to(device)

In [None]:
if contrastive_learning:
  n_epochs = 5
else:
  n_epochs = 10

lr = 1e-3
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

In [None]:
# Pretraining loop
for epoch in trange(n_epochs, desc="Training"):
  running_loss = 0.0
  for batch in train_loader:
    # Move images and labels to the device
    images, labels = batch
    images, labels = images.to(device), labels.to(device)

    # Forward pass
    outputs = model(images)

    # Calculate the loss
    loss = criterion(outputs, labels)

    # Backward pass and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    running_loss += loss.item()

  print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {running_loss / len(train_loader):.4f}")

Training:   0%|          | 0/10 [00:00<?, ?it/s]


Current Epoch:  0
Probability:  1


Training:  10%|█         | 1/10 [03:01<27:12, 181.34s/it]

Epoch [1/10], Loss: 1.8956

Current Epoch:  1
Probability:  1.0


Training:  20%|██        | 2/10 [06:03<24:15, 181.90s/it]

Epoch [2/10], Loss: 1.5068

Current Epoch:  2
Probability:  1.0


Training:  30%|███       | 3/10 [09:05<21:14, 182.07s/it]

Epoch [3/10], Loss: 1.3664

Current Epoch:  3
Probability:  1.0


Training:  40%|████      | 4/10 [12:08<18:14, 182.45s/it]

Epoch [4/10], Loss: 1.2701

Current Epoch:  4
Probability:  1.0


Training:  50%|█████     | 5/10 [15:11<15:11, 182.37s/it]

Epoch [5/10], Loss: 1.1994

Current Epoch:  5
Probability:  1.0


Training:  60%|██████    | 6/10 [18:14<12:10, 182.56s/it]

Epoch [6/10], Loss: 1.1422

Current Epoch:  6
Probability:  1.0


Training:  70%|███████   | 7/10 [21:15<09:06, 182.30s/it]

Epoch [7/10], Loss: 1.0936

Current Epoch:  7
Probability:  1.0


Training:  80%|████████  | 8/10 [24:17<06:04, 182.19s/it]

Epoch [8/10], Loss: 1.0623

Current Epoch:  8
Probability:  1.0


Training:  90%|█████████ | 9/10 [27:19<03:02, 182.15s/it]

Epoch [9/10], Loss: 1.0373

Current Epoch:  9
Probability:  1.0


Training: 100%|██████████| 10/10 [30:20<00:00, 182.08s/it]

Epoch [10/10], Loss: 1.0031





In [None]:
torch.save(model.state_dict(), "model_after_pretraining.pth")

# CONTRASTIVE LEARNING

In [None]:
if contrastive_learning:
  model.load_state_dict(torch.load("model_after_pretraining.pth"))
  model.to(device)

In [None]:
if contrastive_learning:
  margin = 1.0
  n_epochs = 1
  lr = 1e-3
  criterion = ContrastiveLoss(margin)
  optimizer = optim.Adam(model.parameters(), lr=lr)
  batch_size = 128

In [None]:
if contrastive_learning:
  for epoch in trange(n_epochs):
      for batch_idx, (images, labels) in enumerate(train_loader):
          # Forward pass
          if torch.cuda.is_available():
              images = images.cuda(non_blocking=True)
              labels = labels.cuda(non_blocking=True)
          images = images.to(device)

          labels = labels.to(device)
          features = model(images)
          loss = criterion(features, labels)

          # Backward pass and optimization
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          # Print training progress
          if (batch_idx+1) % 10 == 0:
              print(f"Epoch [{epoch+1}/{n_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

In [None]:
if contrastive_learning:
  torch.save(model.state_dict(), "model_after_contrastive_learning.pth")

# FINETUNING

In [None]:
if run == 'default':
      pretrained_model = ViT(in_channels=3,
                patch_size=16,
                emb_size=64,
                img_size=224,
                depth=2,
                n_classes=output_dim,
                num_heads=12).to(device)
else:
      pretrained_model = HViT(in_channels=3,
                 patch_size=16,
                 emb_size=64,
                 img_size=224,
                 depth=2,
                 num_heads=12,
                 n_classes=output_dim,
                 top_k=138,
                 heuristic='variance',
                 probabilistic=False,
                 prob=1,
                 decay_rate=0.0,
                 batch_size=len(train_loader),
                 verbose=False).to(device)

if contrastive_learning:
  pretrained_model.load_state_dict(torch.load("model_after_contrastive_learning.pth"))  
  pretrained_model.to(device)
else:
  pretrained_model.load_state_dict(torch.load("model_after_pretraining.pth"))  
  pretrained_model.to(device)

In [None]:
class ClassificationHead(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ClassificationHead, self).__init__()

        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(input_size, output_size)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        x = self.fc3(x)
        x = self.softmax(x)
        return x

# Creating an instance of MLPClassifier
input_size = 128
hidden_size = 256
num_classes = 100

In [None]:
classification_head = ClassificationHead(input_size, hidden_size, num_classes).to(device)

# Combine the base model and the classification head
model_final = nn.Sequential(pretrained_model, classification_head).to(device)

In [None]:
print("Using device: ", device, f"({torch.cuda.get_device_name(device)})" if torch.cuda.is_available() else "")   


n_epochs = 5
lr = 1e-3
# Training loop
optimizer = Adam(model_final.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

for epoch in trange(n_epochs, desc="Training"):
  train_loss = 0.0
  for batch in train_loader:
    x, y = batch
    x, y = x.to(device), y.to(device)
    y_hat = model_final(x)
    loss = criterion(y_hat, y)
    train_loss += loss.detach().cpu().item() / len(train_loader)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  
  print(f"Epoch {epoch + 1}/{n_epochs} loss: {train_loss:.2f}")

Using device:  cuda (Tesla T4)


Training:   0%|          | 0/5 [00:00<?, ?it/s]


Current Epoch:  0
Probability:  1


Training:  20%|██        | 1/5 [02:12<08:48, 132.22s/it]

Epoch 1/5 loss: 4.22

Current Epoch:  1
Probability:  1.0


Training:  40%|████      | 2/5 [04:23<06:35, 131.77s/it]

Epoch 2/5 loss: 4.11

Current Epoch:  2
Probability:  1.0


Training:  60%|██████    | 3/5 [06:35<04:23, 131.63s/it]

Epoch 3/5 loss: 4.08

Current Epoch:  3
Probability:  1.0


Training:  80%|████████  | 4/5 [08:46<02:11, 131.54s/it]

Epoch 4/5 loss: 4.08

Current Epoch:  4
Probability:  1.0


Training: 100%|██████████| 5/5 [10:57<00:00, 131.46s/it]

Epoch 5/5 loss: 4.07





In [None]:
model = model_final

In [None]:
def inference():
  criterion = nn.CrossEntropyLoss()
  with torch.no_grad():
      correct, total = 0, 0
      test_loss = 0.0
      for batch in tqdm(test_loader, desc="Testing"):
          x, y = batch
          x, y = x.to(device), y.to(device)
          y_hat = model(x)
          loss = criterion(y_hat, y)
          test_loss += loss.detach().cpu().item() / len(test_loader)

          correct += torch.sum(torch.argmax(y_hat, dim=1) == y).detach().cpu().item()
          total += len(x)
      print(f"Test loss: {test_loss:.2f}")
      wandb.log({'loss': test_loss})
      print(f"Test accuracy: {correct / total * 100:.2f}%")
      accuracy = correct / total * 100
      wandb.log({'accuracy': accuracy})

inference()


Testing:   1%|▏         | 1/79 [00:00<00:23,  3.28it/s]


Current Epoch:  5
Probability:  1.0


Testing: 100%|██████████| 79/79 [00:27<00:00,  2.85it/s]

Test loss: 4.09
Test accuracy: 53.57%





In [None]:
wandb.finish()

0,1
accuracy,▁
loss,▁

0,1
accuracy,53.57
loss,4.08987
