<h1> ECE4179 - Semi-Supervised Learning Project</h1>
<h2>Data</h2>

We will be using a dataset that can be obtained directly from the torchvision package. There are 10 classes and we will be training a CNN for the image classification task. We have training, validation and test sets that are labelled with the class, and a large unlabeled set.

We will simulating a low training data scenario by only sampling a small percentage of the labelled data (10%) as training data. The remaining examples will be used as the validation set.

To get the labelled data, change the dataset_dir to something suitable for your machine, and execute the following (you will then probably want to wrap the dataset objects in a PyTorch DataLoader):

In [44]:
import torch
import torch.nn as nn
from torchvision.datasets import STL10 as STL10
import torchvision.transforms as transforms
from torch.utils.data import random_split
from torch.utils.data import DataLoader
import torchvision
from sklearn.metrics import f1_score, classification_report

####### CHANGE TO APPROPRIATE DIRECTORY TO STORE DATASET
dataset_dir = r"\\ad.monash.edu\home\User030\rbea0007\Documents\ECE6179\VS Code\Course Project"
#For MonARCH
# dataset_dir = "/mnt/lustre/projects/ds19/SHARED"

#All images are 3x96x96
image_size = 96
#Example batch size
batch_size = 32

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(torch.cuda.is_available())  # Should return True

Using device: cuda
True


<h3>Create the appropriate transforms</h3>

In [45]:
#Perform random crops and mirroring for data augmentation
transform_train = transforms.Compose(
    [transforms.RandomCrop(image_size, padding=4),
     transforms.RandomHorizontalFlip(p=0.5),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

transform_unlabelled = transforms.Compose(
    [transforms.RandomHorizontalFlip(p=0.5),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

#No random 
transform_test = transforms.Compose(
    [transforms.CenterCrop(image_size),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


<h3>Create training and validation split</h3>

In [46]:
#Load train and validation sets
trainval_set = STL10(dataset_dir, split='train', transform=transform_train, download=True)

#Use 10% of data for training - simulating low data scenario
num_train = int(len(trainval_set)*0.1)

#Split data into train/val sets
torch.manual_seed(0) #Set torch's random seed so that random split of data is reproducible
train_set, val_set = random_split(trainval_set, [num_train, len(trainval_set)-num_train]) #500 train, 4500 val

#Load test set
test_set = STL10(dataset_dir, split='test', transform=transform_test, download=True) #8000 test

Files already downloaded and verified
Files already downloaded and verified


<h3>Get the unlabelled data</h3>

In [47]:
unlabelled_set = STL10(dataset_dir, split='unlabeled', transform=transform_unlabelled, download=True) #100,000 unlabelled

Files already downloaded and verified


You may find later that you want to make changes to how the unlabelled data is loaded. This might require you sub-classing the STL10 class used above or to create your own dataloader similar to the Pytorch one.
https://pytorch.org/docs/stable/_modules/torchvision/datasets/stl10.html#STL10

<h3>Create the four dataloaders</h3>

In [48]:
train_loader = DataLoader(train_set, shuffle=True, batch_size=batch_size)
unlabelled_loader = DataLoader(unlabelled_set, shuffle=True, batch_size=batch_size)

valid_loader = DataLoader(val_set, batch_size=batch_size)
test_loader  = DataLoader(test_set, batch_size=batch_size)

## Network

Define Momentum Contrast Model

In [49]:
class MoCo(nn.Module):
    def __init__(self, base_encoder, model_type, dim=128, K=8192, m=0.999, T=0.07):
        super(MoCo, self).__init__()
        self.encoder_q = base_encoder
        self.encoder_k = base_encoder

        if model_type == 'resnet':
            # Replace the final layer to output the desired dimension
            self.encoder_q.fc = nn.Linear(self.encoder_q.fc.in_features, dim)
            self.encoder_k.fc = nn.Linear(self.encoder_k.fc.in_features, dim)
        elif model_type == 'efficientnet':
            in_features = self.encoder_q.classifier[1].in_features  # Access the last layer
            self.encoder_q.classifier[1] = nn.Linear(in_features, dim)
            self.encoder_k.classifier[1] = nn.Linear(in_features, dim)
        elif model_type == 'vit':
            in_features = self.encoder_q.heads.head.in_features  # Access in_features of ViT head
            self.encoder_q.heads.head = nn.Linear(in_features, dim)  # Modify for query
            self.encoder_k.heads.head = nn.Linear(in_features, dim)  # Modify for keys
        else:
            raise ValueError("Unsupported model: choose 'resnet', 'efficientnet', or 'vit'")
        
        
        for param in self.encoder_k.parameters():
            param.requires_grad = False
    
        self.K = K # queue size
        self.m = m # momentum
        self.T = T # temperature

        self.register_buffer("queue", torch.randn(dim, K))  # Register queue as a buffer
        self.queue = nn.functional.normalize(self.queue, dim=0)
        self.register_buffer("queue_ptr", torch.zeros(1, dtype=torch.long))  # Pointer for queue


    def forward(self, x):
        q = self.encoder_q(x)
        q = nn.functional.normalize(q, dim=1)
        return q
    
    @torch.no_grad()   
    def update_key_encoder(self):
        for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()):
            param_k.data = param_k.data * self.m + param_q.data * (1. - self.m)
        
    @torch.no_grad()
    def enqueue_and_dequeue(self, keys):
        keys = nn.functional.normalize(keys, dim=1)
        batch_size = keys.shape[0]
        ptr = int(self.queue_ptr.item())

        self.queue[:, ptr:ptr + batch_size] = keys.T
        ptr = (ptr + batch_size) % self.K
        self.queue_ptr[0] = ptr

    def contrastive_loss(self, query):
        batch_size = query.shape[0]
        
        # Compute logits
        logits = torch.mm(query, self.queue.clone().detach()) / self.T
        labels = torch.arange(batch_size).cuda()
        
        loss = nn.CrossEntropyLoss()(logits, labels)
        
        return loss

In [50]:
# Pretrain on unlabelled data
def pretrain_model(model, dataloader, num_epochs):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    model.to(device)  # Move the model to the appropriate device

    model.train()
    optimiser = torch.optim.Adam(model.parameters(), lr=1e-4) # get rid of optimiser?

    for epoch in range(num_epochs):
        for images, _ in dataloader:
            images = images.cuda()
            optimiser.zero_grad()

            # Generate augmented views
            images_q = images  # Original images as query
            ### APPLY AUGMENTATIONS ###
            images_k = images 
            images_k = transforms.functional.hflip(images_k)  # Horizontal flip
            images_k = transforms.functional.adjust_brightness(images_k, 1.2)  # Brightness adjustment
            images_k = transforms.functional.adjust_contrast(images_k, 1.2)  # Contrast adjustment
            images_k = transforms.functional.rotate(images_k, angle=15)  # Rotate
           
            # Forward pass
            query = model(images_q)
            query.requires_grad_()

            with torch.no_grad():
                model.update_key_encoder()  # Update the key encoder
                key = model.encoder_k(images_k)

            # Contrastive loss
            loss = model.contrastive_loss(query)

            # Backward pass
            loss.backward()
            optimiser.step()

            # Update key encoder
            model.enqueue_and_dequeue(key)
            #model.update_key_encoder()
        
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

In [51]:
def evaluate_model(model, test_loader): # Accuracy and Macro f1
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    model.eval()

    # Accuracy
    correct = 0
    total = 0
    with torch.no_grad(): # Disable gradient calculation
        for images, labels in test_loader:
            images, labels = images.cuda(), labels.cuda()
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'Accuracy: {100 * correct / total:.2f}%')

    # Macro F1
    all_labels = []
    all_preds = []

    with torch.no_grad(): 
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    f1 = f1_score(all_labels, all_preds, average='macro')
    print(f"Macro F1-score: {f1}")

In [52]:
def finetune_model(model, train_loader, val_loader, num_epochs):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        for images, labels in train_loader:
            images, labels = images.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = model(images)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

## Using ResNet18

In [None]:
# Initialise model using resnet18
model_resnet = torch.hub.load('pytorch/vision', 'resnet18', weights="ResNet18_Weights.IMAGENET1K_V1")
base_encoder = model_resnet
model_moco1 = MoCo(base_encoder, model_type='resnet')

for name, param in model_resnet.named_parameters():
    print(f"Name: {name}, Shape: {param.shape}")

In [None]:
# Pretrain the model
pretrain_model(model_moco1, unlabelled_loader, num_epochs=10)

# Evaluate the model
evaluate_model(model_moco1, test_loader)

In [None]:
# Finetune the model
num_classes = 10  # 10 classes in STL10
model_moco1.encoder_q.fc = nn.Linear(model_moco1.encoder_q.fc.in_features, num_classes) # Adjust fc layer
finetune_model(model_moco1, train_loader, valid_loader, num_epochs=10)

# Evaluate the finetuned model
model_moco1.eval()
evaluate_model(model_moco1, test_loader)

## Using EfficientNetB0

In [None]:
# Initialise model using efficientnetb0
# May have to clear all outputs and run data/user-defined functions again
model_efficient = torch.hub.load('pytorch/vision', 'efficientnet_b0', weights="EfficientNet_B0_Weights.IMAGENET1K_V1")

base_encoder = model_efficient
model_moco2 = MoCo(base_encoder, model_type='efficientnet')

for name, param in model_efficient.named_parameters():
    print(f"Name: {name}, Shape: {param.shape}")


In [None]:
# Pretrain the model
pretrain_model(model_moco2, unlabelled_loader, num_epochs=10)

# Evaluate the model
evaluate_model(model_moco2, test_loader)

In [None]:
# Finetune the model
num_classes = 10  # 10 classes in STL10
in_features = model_moco2.encoder_q.classifier[1].in_features # Adjust fc layer
model_moco2.encoder_q.classifier[1] = nn.Linear(in_features, num_classes)

finetune_model(model_moco2, train_loader, valid_loader, num_epochs=10)

# Evaluate the finetuned model
model_moco2.eval()
evaluate_model(model_moco2, test_loader)

## Using Vision Transformer

In [53]:
# Set image size to 224x224 to match the input size of ViT
transform_train_vit = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.RandomCrop(224, padding=4),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_unlabelled_vit = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_test_vit = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load train and validation sets without redownloading data
trainval_set = STL10(dataset_dir, split='train', transform=transform_train_vit, download=False)

# Use 10% of the data for training (simulating a low data scenario)
num_train = int(len(trainval_set) * 0.1)

# Split data into train/validation sets with a fixed random seed
torch.manual_seed(0)  # Ensure reproducibility
train_set, val_set = random_split(trainval_set, [num_train, len(trainval_set) - num_train])

# Load test set without redownloading data
test_set = STL10(dataset_dir, split='test', transform=transform_test_vit, download=False)

unlabelled_set = STL10(dataset_dir, split='unlabeled', transform=transform_unlabelled_vit, download=False)

train_loader = DataLoader(train_set, shuffle=True, batch_size=batch_size)
unlabelled_loader = DataLoader(unlabelled_set, shuffle=True, batch_size=batch_size)

valid_loader = DataLoader(val_set, batch_size=batch_size)
test_loader  = DataLoader(test_set, batch_size=batch_size)




Name: class_token, Shape: torch.Size([1, 1, 768])
Name: conv_proj.weight, Shape: torch.Size([768, 3, 16, 16])
Name: conv_proj.bias, Shape: torch.Size([768])
Name: encoder.pos_embedding, Shape: torch.Size([1, 197, 768])
Name: encoder.layers.encoder_layer_0.ln_1.weight, Shape: torch.Size([768])
Name: encoder.layers.encoder_layer_0.ln_1.bias, Shape: torch.Size([768])
Name: encoder.layers.encoder_layer_0.self_attention.in_proj_weight, Shape: torch.Size([2304, 768])
Name: encoder.layers.encoder_layer_0.self_attention.in_proj_bias, Shape: torch.Size([2304])
Name: encoder.layers.encoder_layer_0.self_attention.out_proj.weight, Shape: torch.Size([768, 768])
Name: encoder.layers.encoder_layer_0.self_attention.out_proj.bias, Shape: torch.Size([768])
Name: encoder.layers.encoder_layer_0.ln_2.weight, Shape: torch.Size([768])
Name: encoder.layers.encoder_layer_0.ln_2.bias, Shape: torch.Size([768])
Name: encoder.layers.encoder_layer_0.mlp.0.weight, Shape: torch.Size([3072, 768])
Name: encoder.layers.

In [55]:
# Load pretrained Vision Transformer (ViT) model from torchvision models
from torchvision import models

model_vit = models.vit_b_16(pretrained=True)

# Print the model structure to verify the changes
for name, param in model_vit.named_parameters():
    print(f"Name: {name}, Shape: {param.shape}")

base_encoder = model_vit
model_moco3 = MoCo(base_encoder, model_type='vit')

In [56]:
# Pretrain the model
pretrain_model(model_moco3, unlabelled_loader, num_epochs=10)

# Evaluate the model
evaluate_model(model_moco3, test_loader)

Using device: cuda


  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)


KeyboardInterrupt: 

In [None]:
# Finetune the model
num_classes = 10  # 10 classes in STL10
in_features = model_moco3.encoder_q.heads.head.in_features
model_moco3.encoder_q.heads.head = nn.Linear(in_features, num_classes) 

finetune_model(model_moco3, train_loader, valid_loader, num_epochs=10)

# Evaluate the finetuned model
model_moco3.eval()
evaluate_model(model_moco3, test_loader)