<a href="https://colab.research.google.com/github/bhavinbajaj/bhavin15/blob/main/SLT_10_semisupervised_notebbok.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install keras-cv tensorflow --upgrade

Collecting keras-cv
  Downloading keras_cv-0.6.4-py3-none-any.whl (803 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.1/803.1 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
Collecting tensorflow
  Downloading tensorflow-2.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (524.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m524.1/524.1 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
Collecting keras-core (from keras-cv)
  Downloading keras_core-0.1.6-py3-none-any.whl (944 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m944.9/944.9 kB[0m [31m41.0 MB/s[0m eta [36m0:00:00[0m
Collecting keras<2.14,>=2.13.1 (from tensorflow)
  Downloading keras-2.13.1-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m40.2 MB/s[0m eta [36m0:00:00[0m
Collecting tensorboard<2.14,>=2.13 (from tensorflow)
  Downloading tensorboard-2.13.0-py3-none-any.whl (5.6 MB)
[2K     

In [None]:
pip install keras_core



In [None]:
import keras
import keras_cv
import tensorflow as tf
import tensorflow_datasets as tfds
from matplotlib import pyplot as plt

tfds.disable_progress_bar()

Using TensorFlow backend


In [None]:
IMAGE_SIZE = 96
IMAGE_CHANNELS = 3
NUM_CLASSES = 10

# Algorithm hyperparameter
UNLABELED_BATCH_SIZE = 1024
LABELED_BATCH_SIZE = 128
TEST_BATCH_SIZE = 128

In [None]:
from google.colab import drive

drive.mount("/content/drive/")

Mounted at /content/drive/


In [None]:
import os

PATH = '/content/drive/MyDrive/semi_super_tutorial/' # You can change the localisation of your mini-project about semi-supervised learning
stl10_path = os.path.join(PATH, "STL10")
os.makedirs(stl10_path, exist_ok=True)

In [None]:
from torchvision.datasets import STL10
from torchvision.transforms import ToTensor

train_labeled_dataset = STL10(root=stl10_path, split="train", download=True, transform=ToTensor())
train_unlabeled_dataset = STL10(root=stl10_path, split="unlabeled", download=True, transform=ToTensor())
test_labeled_dataset = STL10(root=stl10_path, split="test", download=True, transform=ToTensor())

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 128
train_loader = DataLoader(
    train_labeled_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=False,
)
unlabeled_loader = DataLoader(
    train_unlabeled_dataset,
    batch_size=UNLABELED_BATCH_SIZE,
    shuffle=True,
    drop_last=False,
)
test_loader = DataLoader(
    test_labeled_dataset,
    batch_size=BATCH_SIZE,
    drop_last=False,
)

In [None]:
import torch
import torchvision
import time
from torch import nn, optim
from torch.utils.data import DataLoader

class SupervisedModel():
    def __init__(self, encoder):
        self.model = encoder

    @staticmethod
    def count_correct(
        y_pred: torch.Tensor,
        y_true: torch.Tensor
    ) -> torch.Tensor:
        """
        Returns the number of correct predictions
        """
        preds = torch.argmax(y_pred, dim=1)
        return (preds == y_true).float().sum()


    def validate_model(
        self,
        loss_fn,
        dataloader: DataLoader
    ):
        """
        Validates given model with given loss function on given DataLoader
        """
        loss = 0
        correct = 0
        all = 0
        with torch.no_grad():
            for X_batch, y_batch in dataloader:
                device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)
                y_pred = self.model(X_batch)
                all += len(y_pred)
                loss += loss_fn(y_pred, y_batch)
                correct += SupervisedModel.count_correct(y_pred, y_batch)
        return loss / all, correct / all


    def train_model(
        self,
        optimiser: optim.Optimizer,
        loss_fn,
        train_loader: DataLoader,
    ):
        """
        Trains given model with given loss function on given DataLoader
        """
        self.model.train()

        for X_batch, y_batch in train_loader:
            device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            y_pred = self.model(X_batch)
            loss = loss_fn(y_pred, y_batch)

            loss.backward()
            optimiser.step()
            optimiser.zero_grad()

        self.model.eval()


    def fit(
        self,
        loss_fn,
        train_loader: DataLoader,
        test_loader: DataLoader,
        epochs=2
    ) -> dict:
        """
        Trains model and validates on training and validation data
        """
        results = {"train_loss": [], "train_acc": [], "val_loss": [],
                   "val_acc": []}
        optimiser = optim.AdamW(self.model.parameters())

        for epoch in range(epochs):
            start = time.time()

            self.train_model(optimiser, loss_fn, train_loader)
            train_loss, train_acc = self.validate_model(loss_fn, train_loader)
            val_loss, val_acc = self.validate_model(loss_fn, test_loader)

            print(f"Epoch {epoch + 1}: train loss = {train_loss:.3f} "
             f"(acc: {train_acc:.3f}), validation loss = {val_loss:.3f} "
             f"(acc: {val_acc:.3f}), time {time.time() - start:.1f}s")
            results["train_loss"].append(float(train_loss))
            results["train_acc"].append(float(train_acc))
            results["val_loss"].append(float(val_loss))
            results["val_acc"].append(float(val_acc))

        return results

def evaluate_baseline():
    encoder = torchvision.models.resnet18()
    encoder.fc = nn.Linear(512, 10) # Resnet18 is created for 1000 classses so we need to change the last layer
    baseline_model = SupervisedModel(encoder)

    loss_fn = nn.CrossEntropyLoss(reduction='sum')
    return baseline_model.fit(loss_fn, train_loader, test_loader)

history_baseline = evaluate_baseline()

Epoch 1: train loss = 2.067 (acc: 0.286), validation loss = 2.115 (acc: 0.285), time 609.1s
Epoch 2: train loss = 1.442 (acc: 0.472), validation loss = 1.634 (acc: 0.425), time 596.6s


In [None]:
encoder = torchvision.models.resnet18()
encoder.fc = nn.Linear(encoder.fc.in_features, 10)
model = SupervisedModel(encoder)
loss_fn = nn.CrossEntropyLoss()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Hyperparameters
threshold = 0.7  # Confidence threshold for pseudo-labeling
epochs_per_iteration = 1  # Number of epochs to train in each iteration

# Train the model with pseudo-labeling
for iteration in range(5):  # You can adjust the number of iterations
    print(f"Iteration {iteration + 1}")

    # Train the model on the labeled dataset
    model.fit(loss_fn, train_loader, train_loader, epochs=epochs_per_iteration)

    predictions = []
    confidences = []
    with torch.no_grad():
        for X_batch, _ in unlabeled_loader:
            X_batch = X_batch.to(device)
            y_pred = model.model(X_batch)
            confidence, preds = torch.max(y_pred, dim=1)
            predictions.extend(preds.tolist())
            confidences.extend(confidence.tolist())

    # Filter predictions above the threshold
    pseudo_labeled_data = []
    for i, confidence in enumerate(confidences):
        if confidence > threshold:
            img_tensor = torch.from_numpy(train_unlabeled_dataset.data[i])  # Convert NumPy array to tensor
            pseudo_labeled_data.append((img_tensor, predictions[i]))

    if len(pseudo_labeled_data) == 0:
        print("No more predictions above the threshold. Stopping iterations.")
        break

    # Add pseudo-labeled data to the labeled dataset
    pseudo_labeled_dataset = torch.utils.data.TensorDataset(
        torch.stack([data[0] for data in pseudo_labeled_data]),
        torch.tensor([data[1] for data in pseudo_labeled_data])
    )
    train_labeled_dataset += pseudo_labeled_dataset

# Evaluate the final model on the test set
final_accuracy = model.validate_model(loss_fn, test_loader)
print(f"Final accuracy on the test set: {final_accuracy:.3f}")

Iteration 1
Epoch 1: train loss = 0.017 (acc: 0.299), validation loss = 0.017 (acc: 0.299), time 536.0s
Iteration 2
Epoch 1: train loss = 0.023 (acc: 0.221), validation loss = 0.023 (acc: 0.221), time 531.9s
