In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
import math

## Torchvision
import torchvision
from torchvision.datasets import CIFAR10
from torchvision import transforms,datasets
from dataset_wrapper import get_pet_datasets
## Imports for plotting
import matplotlib.pyplot as plt

In [5]:
batch_size = 4
img_h = 128;

transform = transforms.Compose([transforms.ToTensor()])

train_dataset, val_dataset, test_dataset = get_pet_datasets(img_width=img_h, img_height=img_h,root_path='./data' )

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)

val_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)


#selecting device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#printing because my main kernel wants to be stuck on CPU-only pytorch fsr
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "None")

# When iteration starts, queue and thread start to load data from files.
data_iter = iter(train_loader)
# Mini-batch images and labels.
images_onebatch, labels = next(data_iter)
print(images_onebatch.shape)

GPU name: NVIDIA GeForce RTX 3070 Laptop GPU
torch.Size([4, 3, 128, 128])


In [6]:
class VisionTransformer(nn.Module):

    def __init__(self, embed_dim, hidden_dim, num_channels, num_heads, num_layers, num_classes, para_embedding, patch_size, num_patches, dropout=0.0):
        """
        Inputs:
            embed_dim - Dimensionality of the input feature vectors to the Transformer
            hidden_dim - Dimensionality of the hidden layer in the feed-forward networks
                         within the Transformer
            num_channels - Number of channels of the input (3 for RGB)
            num_heads - Number of heads to use in the Multi-Head Attention block
            num_layers - Number of layers to use in the Transformer
            num_classes - Number of classes to predict
            patch_size - Number of pixels that the patches have per dimension
            num_patches - Maximum number of patches an image can have
            dropout - Amount of dropout to apply in the feed-forward network and
                      on the input encoding
        """
        super().__init__()

        self.patch_size = patch_size
        self.para_embedding = para_embedding  # save it

        # Layers/Networks
        self.input_layer = nn.Linear(num_channels*(patch_size**2), embed_dim)
        self.transformer = nn.Sequential(*[AttentionBlock(embed_dim, hidden_dim, num_heads, dropout=dropout) for _ in range(num_layers)])
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )
        self.dropout = nn.Dropout(dropout)

        # Parameters/Embeddings
        self.cls_token = nn.Parameter(torch.randn(1,1,embed_dim))
        if self.para_embedding == "Fixed":
            self.register_buffer('pos_embedding', get_sinusoidal_positional_embedding(1 + num_patches, embed_dim))#fixed position embedding 
        elif self.para_embedding == "Learnable":
            self.pos_embedding = nn.Parameter(torch.randn(1,1+num_patches,embed_dim)) # learnable positional embedding
        else:
            pass
            #no pos embedding lol


    def forward(self, x):
        # Preprocess input
        x = img_to_patch(x, self.patch_size)
        B, T, C, Ph, Pw = x.shape
        x = x.flatten(2,4)  # # Flatten channel and patch spatial dimensions(2-4) (C, patch_H, patch_W) -> (C * patch_H * patch_W)
        B, T, _ = x.shape
        x = self.input_layer(x)

        # Add CLS token and positional encoding, summary token, the classification model will learn this token's attention on the entire sequence
        cls_token = self.cls_token.repeat(B, 1, 1) # A learnable parameter
        x = torch.cat([cls_token, x], dim=1)
        if self.para_embedding is not None:
            x = x + self.pos_embedding[:,:T+1] # CLS Token need positinal embedding

        # Apply Transforrmer
        x = self.dropout(x)
        x = x.transpose(0, 1)
        x = self.transformer(x)

        # Perform classification prediction
        cls = x[0]
        out = self.mlp_head(cls)
        return out

In [7]:
class AttentionBlock(nn.Module):

    def __init__(self, embed_dim, hidden_dim, num_heads, dropout=0.0):
        """
        Inputs:
            embed_dim - Dimensionality of input and attention feature vectors
            hidden_dim - Dimensionality of hidden layer in feed-forward network
                         (usually 2-4x larger than embed_dim)
            num_heads - Number of heads to use in the Multi-Head Attention block
            dropout - Amount of dropout to apply in the feed-forward network
        """
        super().__init__()

        self.layer_norm_1 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads,
                                          dropout=dropout)
        self.layer_norm_2 = nn.LayerNorm(embed_dim)
        self.linear = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim), #expansion to four folder
            nn.GELU(), # Gaussian Error Linear Units (GELUs)
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, embed_dim), #reduce the dimensionality back
            nn.Dropout(dropout)
        )


    def forward(self, x):
        inp_x = self.layer_norm_1(x)
        x = x + self.attn(inp_x, inp_x, inp_x)[0]
        x = x + self.linear(self.layer_norm_2(x))
        return x

In [8]:
import numpy as np
def get_sinusoidal_positional_embedding(n_positions, dim):
    position = torch.arange(n_positions).unsqueeze(1)        # (n_positions, 1)
    div_term = torch.exp(torch.arange(0, dim, 2) * (-np.log(10000.0) / dim))  # (dim/2,) #Avoids overflow/underflow in exp(log) way
    
    pe = torch.zeros(n_positions, dim)
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    
    return pe.unsqueeze(0)  # shape: (1, n_positions, dim)


In [9]:
def img_to_patch(x, patch_size):
    """
    Inputs:
        x - torch.Tensor representing the image of shape [B, C, H, W]
        patch_size - Number of pixels per dimension of the patches (integer)
        flatten_channels - If True, the patches will be returned in a flattened format
                           as a feature vector instead of a image grid.
    """
    B, C, H, W = x.shape
    x = x.reshape(B, C, H//patch_size, patch_size, W//patch_size, patch_size)#[B, C, H', p_H, W',p_W]
    x = x.permute(0, 2, 4, 1, 3, 5) # [B, H', W', C, p_H, p_W]
    x = x.flatten(1,2)              # [B, H'*W', C, p_H, p_W]
    return x

img_patches = img_to_patch(images_onebatch, patch_size=4)# CIFAR10 images are 32x32

In [10]:
def append_text_to_file(file_path, text_to_append):
    try:
        with open(file_path, 'a') as file:
            file.write(text_to_append + '\n')
        print(f"Text appended to {file_path} successfully.")
    except Exception as e:
        print(f"Error: {e}")

In [15]:
import re
import os
def extract_models_from_file(filepath):
    models_ive_trained = []

     #creates file if it doesn't exist
    if not os.path.exists(filepath):
        with open(filepath, "w") as file:
            pass  

    # Read file
    with open(filepath, "r") as file:
        lines = file.readlines()

    # Define a regex pattern to capture required parameters
    pattern = re.compile(
        r"pos_embedding: (\w+); num_heads: (\d+); num_layers:(\d+); patch_size: (\d+)",
        re.IGNORECASE
    )

    # Process every line to match pattern
    for line in lines:
        match = pattern.search(line)
        if match:
            pos_embedding, num_heads, num_layers, patch_size = match.groups()
            model_string = f"{pos_embedding}{num_heads}{num_layers}{patch_size}"
            models_ive_trained.append(model_string)

    return models_ive_trained

In [16]:
# Plot training loss
def generate_charts(num_layers, num_heads, pos_embedding, patch_size, train_losses, val_accuracies):
    #print(f"train_losses in gen_charts: {len(train_losses)}")
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label=f'Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Training Loss Curve \n({num_layers} layers, {num_heads} heads, pos_embedding={pos_embedding}, patch_size={patch_size})')
    plt.legend()
    
    # Plot validation accuracy
    #print(f"validation accuracy in gen_charts: {len(val_accuracies)}")
    plt.subplot(1, 2, 2)
    plt.plot(val_accuracies, label=f'Validation Accuracy', color='green')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title(f'Validation Accuracy Curve \n({num_layers} layers, {num_heads} heads, pos_embedding={pos_embedding}, patch_size={patch_size})')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(f'part2_layers{num_layers}_numHeads{num_heads}_embedding{pos_embedding}_patchSize{patch_size}.png')
    plt.show()

In [17]:
def test_model(model, descript, batch_size, device):
    # --- Test the model ---
    model.eval()  # evaluation mode
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)# choose the class that have the highest score
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
        print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))
        #Remember guessing randomly among 10 classes would be about 25% accuracy
    
    new_acc_descript = f'       Test Accuracy of the model on the {total} test images: {(100 * correct / total)}'
    append_text_to_file('Part2AllAccuracies.txt', descript + "\n" + new_acc_descript)
    print(descript + "\n" + new_acc_descript)

In [None]:
embed_dim = 32 #I used an old version of the assignment brief that stated embed_dim=32. As a result, I can also only use 4 heads
hidden_dim = 256
num_channels = 3
num_head = 4
num_layers = [3, 4, 5, 6] 
num_classes = 4 #we have 4 classes
patch_size = [4, 8, 16]
para_embedding = [ "Fixed", "Learnable", None ]

filepath = "Part2AllAccuracies.txt"
models_ive_trained = extract_models_from_file(filepath)

learning_rate = 0.001
num_epochs = 25  #often trained for 300–500 epochs 


for a in range(len(para_embedding)):
    embedding_bool = para_embedding[a]
    for w in range(len(num_layers)):
        num_layer = num_layers[w]
        for e in range(len(patch_size)):
            cur_patch_size = patch_size[e]
            #checking if i've already trained this kind of model (and thus it can be skipped)
            #i use this in case a meteor hit my computer mid-training
            this_model = f"{embedding_bool}{num_head}{num_layer}{cur_patch_size}"
            if this_model in models_ive_trained:
                print("We've already trained this model, it's getting skipped.")
            else:
                num_patches = (128//cur_patch_size)*(128//cur_patch_size)  #img is 128x128

                #LETS GOOO
                model = VisionTransformer(embed_dim, hidden_dim, num_channels, num_heads=num_head, num_layers=num_layer, num_classes=num_classes, patch_size=cur_patch_size,para_embedding=embedding_bool, num_patches=num_patches).to(device)

                # Loss and optimizer
                criterion = nn.CrossEntropyLoss() # Feel free to use other loss
                optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # Feel free to try other optimizer, e,g,SGD (Stochastic Gradient Descent), Adagrad

                train_losses = []
                val_accuracies = []

                 # --- Train the model ---
                #making a description of parameters that will print to both the console and (at the end) to a .txt file
                descript = f"pos_embedding: {embedding_bool}; num_heads: {num_head}; num_layers:{num_layer}; patch_size: {cur_patch_size}";
                print(descript)
                
                total_step = len(train_loader)
                for epoch in range(num_epochs):
                    #entering training mode (important as we entered eval mode during val)
                    model.train()
                    epoch_loss = 0
                    for i, (images, labels) in enumerate(train_loader):
                        images = images.to(device)
                        labels = labels.to(device)
                        
                        # Forward pass
                        outputs = model(images)
                        loss = criterion(outputs, labels)
                        
                        # Backward and optimize
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()
                        epoch_loss += loss.item()   
                        if (i+1) % 1430 == 0:
                            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], Loss: {loss.item():.4f}')

                    avg_loss = epoch_loss / total_step
                    train_losses.append(avg_loss)
    
                    #now we do validation. exit training mode
                    model.eval()
                    correct = 0                       
                    total = 0
                    with torch.no_grad():
                        for images, labels in val_loader:
                            images = images.to(device)
                            labels = labels.to(device)
                            outputs = model(images)
                            _, predicted = torch.max(outputs.data, 1)
                            total += labels.size(0)
                            correct += (predicted == labels).sum().item()
                    accuracy = 100 * correct / total
                    val_accuracies.append(accuracy)
                    #print(f"length of val_accuracies in run_model: {len(val_accuracies)}")
                generate_charts(num_layers=num_layer, num_heads=num_head,pos_embedding=embedding_bool, patch_size=cur_patch_size, train_losses=train_losses, val_accuracies=val_accuracies)                    
                test_model(model, descript, batch_size, device)


pos_embedding: Fixed; num_heads: 4; num_layers:3; patch_size: 4
Epoch [1/25], Step [1430/1430], Loss: 1.2959
Epoch [2/25], Step [1430/1430], Loss: 1.3293
Epoch [3/25], Step [1430/1430], Loss: 1.0323
