In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms
from PIL import Image
from helper_methods import get_train_val_mltclass, get_test, create_dataset_mltclass, show_images, load_and_transform_image
import torch
import torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchvision.models import ResNet34_Weights
from torchvision.models import resnet18, ResNet18_Weights

from torch.utils.data import TensorDataset, DataLoader

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tarfile

# Unzipping the annotations file
annotations_tar_path = '/content/drive/MyDrive/annotations.tar.gz'
annotations_extract_path = '/content/annotations'
with tarfile.open(annotations_tar_path, 'r:gz') as tar:
    tar.extractall(path=annotations_extract_path)

# Unzipping the images file
images_tar_path = '/content/drive/MyDrive/images.tar.gz'
images_extract_path = '/content/images'
with tarfile.open(images_tar_path, 'r:gz') as tar:
    tar.extractall(path=images_extract_path)

In [None]:
df_train, df_val = get_train_val_mltclass(filepath='/content/annotations/annotations/trainval.txt', val_size=0.2)
X_train, Y_train = create_dataset_mltclass(df_train, base_path = "/content/images/images/", augment=True)
X_val, Y_val = create_dataset_mltclass(df_val, base_path="/content/images/images/", augment = False)

# Convert Y_train and Y_val to Long right after their creation
Y_train = Y_train.long()
Y_val = Y_val.long()


#test data
df_test = get_test(filepath='/content/annotations/annotations/test.txt')
X_test, Y_test = create_dataset_mltclass(df_test, base_path="/content/images/images/")

# Convert Y_test to Long
Y_test = Y_test.long()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):

    def training_step(self, batch):
        #print("TRAINING DEVIDE: ", device)
        images, labels = batch
        images, labels = images.to(device), labels.to(device)  # Move data to GPU
        out = self(images)
        loss = F.cross_entropy(out, labels)
        return loss

    # def validation_step(self, batch):
    #     images, labels = batch
    #     images, labels = images.to(device), labels.to(device)  # Move data to GPU
    #     out = self(images)
    #     with torch.no_grad():
    #         loss = F.cross_entropy(out, labels)
    #         acc = accuracy(out, labels)

    #     return {'val_loss': loss.detach(), 'val_acc': acc}

    def validation_step(self, batch):
      images, labels = batch
      #print("VALIDATION DEVICE: ", device)
      images, labels = images.to(device), labels.to(device)  # Move data to GPU
      out = self(images)

      # Compute loss
      loss = F.cross_entropy(out, labels)

      # Compute number of correct predictions
      _, predicted = torch.max(out, 1)
      num_correct = (predicted == labels).sum().item()

      return {'val_loss': loss.detach(), 'num_correct': num_correct}

    def predict(self, images):
      #images, _ = batch
      images = images.to(device)
      out = self(images)
      _, predicted = torch.max(out, 1)
      return predicted


In [None]:
import torch.nn as nn

def set_bn_eval(module):
    """Set BatchNorm layers in evaluation mode."""
    if isinstance(module, nn.BatchNorm2d):
        module.eval()


In [None]:
class DogsCatsCnnModelResNet34(ImageClassificationBase):
    def __init__(self):

        super().__init__()
        # Use a pretrained model
        self.network = models.resnet34(weights=ResNet34_Weights.DEFAULT)
        # Replace last layer
        num_ftrs = self.network.fc.in_features
        #self.network.fc = nn.Linear(num_ftrs, 37)

        self.network.fc = nn.Sequential(
                          nn.Dropout(0.5),
                          nn.Linear(num_ftrs, 37)
                      )

        # Freeze the early layers
        for name, param in self.network.named_parameters():
            if 'layer1' in name or 'layer2' in name or 'layer3' in name:
                param.requires_grad = False

                # Apply the batch normalization adjustment
                if isinstance(param, nn.BatchNorm2d):
                    set_bn_eval(param)

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

    def predict(self, images):
      return super().predict(images)

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

#@torch.no_grad()
def evaluate(model, X_val, Y_val, batch_size=256):
    with torch.no_grad():
      model.eval()

    loss = 0
    correct = 0
    total_samples = 0
    # Try loadign data into batches
    val_loader = DataLoader(TensorDataset(X_val, Y_val), batch_size=batch_size, shuffle=False)
    with torch.no_grad():
      for batch in val_loader:
        X_batch, Y_batch = batch
        #X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)  # Move batch data to the GPU
        outputs = model.validation_step([X_batch, Y_batch])
        loss += outputs['val_loss'].item() * len(batch[0])
        correct += outputs['num_correct']
        total_samples += len(batch[0])

    return {'val_loss': loss / total_samples, 'accuracy': correct / total_samples}
    #batch = [X_val.to(device), Y_val.to(device)]  # Move batch data to the GPU
    #outputs = model.validation_step(batch)
    #return outputs

def fit(model, epochs, lr, X_train, Y_train, X_val, Y_val, batch_size=32, opt_func=torch.optim.SGD):
    history = []
    # optimizer = opt_func(model.parameters(), lr)

    # Filter out the parameters to update to ensure that "Only Unfrozen Parameters Are Passed to the Optimizer"
    params_to_update = [p for p in model.parameters() if p.requires_grad]
    optimizer = opt_func(params_to_update, lr)

    scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True)


    # Create TensorDataset and DataLoader for training data
    train_dataset = TensorDataset(X_train, Y_train)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) #, num_workers=4

    # Move the model to the GPU
    model = model.to(device)
    #print("MODEL TO DEVICE: ", device)

    for epoch in range(epochs):
        print(f"Epoch: {epoch}")

        # Training Phase
        model.train()
        train_losses = []
        for i, batch in enumerate(train_loader):
            X_batch, Y_batch = batch
            #X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)  # Move batch data to the GPU

            loss = model.training_step([X_batch, Y_batch])
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # Validation phase
        result = evaluate(model, X_val, Y_val, batch_size)
        print(f"Epoch {epoch}, result: {result}")
        history.append(result)

        torch.cuda.empty_cache()


    return history

In [None]:
#model_resnet_34 = DogsCatsCnnModelResNet34().to(device)
model_resnet_34 = DogsCatsCnnModelResNet34()
#print(evaluate(model_resnet_34, X_val, Y_val,batch_size=256))
import torch
torch.cuda.empty_cache()


num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 256
lr = 0.001

history0 = fit(model_resnet_34, num_epochs, lr, X_train, Y_train, X_val, Y_val, batch_size, opt_func)

import torch
torch.cuda.empty_cache()
print("Evaluation of model trained on the entire dataset", evaluate(model_resnet_34, X_test, Y_test))


In [None]:
# Use 100%, 50%, 10% and 1% of training data
print(X_train.shape)
# 50 %
X_train_half = X_train[:len(X_train) // 2]
Y_train_half = Y_train[:len(Y_train) // 2]
X_unlabeled_half = X_train[len(X_train) // 2:]
Y_unlabeled_half = Y_train[len(Y_train) // 2:]
print(X_train_half.shape)
print(Y_train_half.shape)
print(X_unlabeled_half.shape)
print(Y_unlabeled_half.shape)
# 10 %
X_train_ten = X_train[:len(X_train) // 10]
Y_train_ten = Y_train[:len(Y_train) // 10]
X_unlabeled_ten = X_train[len(X_train) // 10:]
Y_unlabeled_ten = Y_train[len(Y_train) // 10:]
print(X_train_ten.shape)
print(Y_train_ten.shape)
print(X_unlabeled_ten.shape)
print(Y_unlabeled_ten.shape)
# 1 %
X_train_one = X_train[:len(X_train) // 100]
Y_train_one = Y_train[:len(Y_train) // 100]
X_unlabeled_one = X_train[len(X_train) // 100:]
Y_unlabeled_one = Y_train[len(Y_train) // 100:]
print(X_train_one.shape)
print(Y_train_one.shape)
print(X_unlabeled_one.shape)
print(Y_unlabeled_one.shape)


In [None]:
class DogsCatsCnnModelResNet18(ImageClassificationBase):
    def __init__(self):

        super().__init__()
        # Use a pretrained model
        self.network = models.resnet18(weights=ResNet18_Weights.DEFAULT)
        # Replace last layer
        num_ftrs = self.network.fc.in_features
        #self.network.fc = nn.Linear(num_ftrs, 37)

        self.network.fc = nn.Sequential(
                          nn.Dropout(0.5),
                          nn.Linear(num_ftrs, 37)
                      )

        # Freeze the early layers
        for name, param in self.network.named_parameters():
            if 'layer1' in name or 'layer2' in name or 'layer3' in name:
                param.requires_grad = False

                # Apply the batch normalization adjustment
                if isinstance(param, nn.BatchNorm2d):
                    set_bn_eval(param)

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

    def predict(self, images):
      return super().predict(images)

In [None]:
# We use ResNet18 as our teacher model
#model_resnet_18_half = DogsCatsCnnModelResNet18().to(device)
model_resnet_18_half_teacher = DogsCatsCnnModelResNet18()
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 256
lr = 0.001
history_half = fit(model_resnet_18_half_teacher, num_epochs, lr, X_train_half, Y_train_half, X_val, Y_val, batch_size, opt_func)
evaluate_teacher_half = evaluate(model_resnet_18_half_teacher, X_test, Y_test)
print("Evaluation of model trained on the 50% of dataset", evaluate_teacher_half)


In [None]:
torch.cuda.empty_cache()
# Create TensorDataset and DataLoader for training data
dataset = TensorDataset(X_unlabeled_half, Y_unlabeled_half)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4)

predictions = []
with torch.no_grad():
  for i, batch in enumerate(data_loader):
        images, _ = batch
        predictions.append(model_resnet_18_half_teacher.predict(images))
#print(predictions)
flattened_predictions = [prediction.flatten() for prediction in predictions]
Y_unlabeled_half_predictions = torch.cat(flattened_predictions)
print(len(Y_unlabeled_half_predictions))
print(len(Y_unlabeled_half))
#print(Y_unlabeled_half_predictions[1])
#print(Y_unlabeled_half[1])
Y_unlabeled_half = Y_unlabeled_half.to(device)
#print(predictions_flat[1])
#print(Y_unlabeled_half[1])
acc = (Y_unlabeled_half_predictions == Y_unlabeled_half).sum().item() / len(Y_unlabeled_half)
print("Accuracy: ", acc)


In [None]:
# Concatenate the dataset with the pseudo labels so that we can train the student model in the entire dataset


X_train_half_concatenated =  torch.cat((X_train_half, X_unlabeled_half), dim=0)
Y_train_half_concatenated = torch.cat((Y_train_half.to(device), Y_unlabeled_half_predictions), dim=0)

model_resnet_18_half_student = DogsCatsCnnModelResNet18()
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 256
lr = 0.001
device = torch.device("cuda")
print("Using device:", device)
fit(model_resnet_18_half_student, num_epochs, lr, X_train_half_concatenated, Y_train_half_concatenated, X_val, Y_val, batch_size, opt_func)
evaluate_student_half = evaluate(model_resnet_18_half_student, X_test, Y_test)
print("Evaluation of model trained on the 50% of dataset", evaluate_student_half)


In [None]:
import gc
# Delete variables
del evaluate_student_half, model_resnet_18_half_student, X_train_half_concatenated, Y_train_half_concatenated
del X_train_half, X_unlabeled_half, Y_train_half, Y_unlabeled_half, Y_unlabeled_half_predictions, flattened_predictions
del model_resnet_18_half_teacher, model_resnet_34, X_train, Y_train
# Run garbage collection
gc.collect()

torch.cuda.empty_cache()


In [None]:
model_resnet_18_ten_teacher = DogsCatsCnnModelResNet18()
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 256
lr = 0.001
fit(model_resnet_18_ten_teacher, num_epochs, lr, X_train_ten, Y_train_ten, X_val, Y_val, batch_size, opt_func)

evaluate_teacher_ten = evaluate(model_resnet_18_ten_teacher, X_test, Y_test)
print("Evaluation of model trained on the 10% of dataset", evaluate_teacher_ten)


torch.cuda.empty_cache()
# Create TensorDataset and DataLoader for training data
dataset = TensorDataset(X_unlabeled_ten, Y_unlabeled_ten)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4) #, num_workers=4

predictions = []
with torch.no_grad():
  for i, batch in enumerate(data_loader):
        images, _ = batch
        predictions.append(model_resnet_18_ten_teacher.predict(images))
#print(predictions)

flattened_predictions = [prediction.flatten() for prediction in predictions]
Y_unlabeled_ten_predictions = torch.cat(flattened_predictions)
print(len(Y_unlabeled_ten_predictions))
print(len(Y_unlabeled_ten))

Y_unlabeled_ten = Y_unlabeled_ten.to(device)
#print(predictions_flat[1])
#print(Y_unlabeled_half[1])
acc = (Y_unlabeled_ten_predictions == Y_unlabeled_ten).sum().item() / len(Y_unlabeled_ten)
print("Accuracy: ", acc)


In [None]:
# Concatenate the dataset with the pseudo labels so that we can train the student model in the entire dataset

X_train_ten_concatenated =  torch.cat((X_train_ten, X_unlabeled_ten), dim=0)
Y_train_ten_concatenated = torch.cat((Y_train_ten.to(device), Y_unlabeled_ten_predictions), dim=0)

model_resnet_18_ten_student = DogsCatsCnnModelResNet18()

fit(model_resnet_18_ten_student, num_epochs, lr, X_train_ten_concatenated, Y_train_ten_concatenated, X_val, Y_val, batch_size, opt_func)
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 16
lr = 0.001
evaluate_student_ten= evaluate(model_resnet_18_ten_student, X_test, Y_test)
print("Evaluation of model trained on the 10% of dataset", evaluate_student_ten)

In [None]:
import gc
# Delete variables
del evaluate_student_ten, model_resnet_18_ten_student, X_train_ten_concatenated, Y_train_ten_concatenated
del X_train_ten, X_unlabeled_ten, Y_train_ten, Y_unlabeled_ten, Y_unlabeled_ten_predictions, flattened_predictions
del model_resnet_18_ten_teacher
# Run garbage collection
gc.collect()

torch.cuda.empty_cache()

In [None]:
model_resnet_18_one_teacher = DogsCatsCnnModelResNet18()
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 16
lr = 0.001
fit(model_resnet_18_one_teacher, num_epochs, lr, X_train_one, Y_train_one, X_val, Y_val, batch_size, opt_func)

evaluate_teacher_one = evaluate(model_resnet_18_one_teacher, X_test, Y_test)
print("Evaluation of model trained on the 1% of dataset", evaluate_teacher_one)


torch.cuda.empty_cache()
# Create TensorDataset and DataLoader for training data
dataset = TensorDataset(X_unlabeled_one, Y_unlabeled_one)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4) #, num_workers=4

predictions = []
with torch.no_grad():
  for i, batch in enumerate(data_loader):
        images, _ = batch
        predictions.append(model_resnet_18_one_teacher.predict(images))
#print(predictions)

flattened_predictions = [prediction.flatten() for prediction in predictions]
Y_unlabeled_one_predictions = torch.cat(flattened_predictions)
print(len(Y_unlabeled_one_predictions))
print(len(Y_unlabeled_one))

Y_unlabeled_one = Y_unlabeled_one.to(device)
#print(predictions_flat[1])
#print(Y_unlabeled_half[1])
acc = (Y_unlabeled_one_predictions == Y_unlabeled_one).sum().item() / len(Y_unlabeled_one)
print("Accuracy: ", acc)
# Concatenate the dataset with the pseudo labels so that we can train the student model in the entire dataset

X_train_one_concatenated =  torch.cat((X_train_one, X_unlabeled_one), dim=0)
Y_train_one_concatenated = torch.cat((Y_train_one.to(device), Y_unlabeled_one_predictions), dim=0)

model_resnet_18_one_student = DogsCatsCnnModelResNet18()

fit(model_resnet_18_one_student, num_epochs, lr, X_train_one_concatenated, Y_train_one_concatenated, X_val, Y_val, batch_size, opt_func)
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 16
lr = 0.001
evaluate_student_one= evaluate(model_resnet_18_one_student, X_test, Y_test)
print("Evaluation of model trained on the 1% of dataset", evaluate_student_one)

In [None]:
import gc
# Delete variables
del evaluate_student_one, model_resnet_18_one_student, X_train_one_concatenated, Y_train_one_concatenated
del X_train_one, X_unlabeled_one, Y_train_one, Y_unlabeled_one, Y_unlabeled_one_predictions, flattened_predictions
del model_resnet_18_one_teacher
# Run garbage collection
gc.collect()

torch.cuda.empty_cache()

In [None]:
from torchvision.models import resnet101, ResNet101_Weights

class DogsCatsCnnModelResNet101(ImageClassificationBase):
    def __init__(self):

        super().__init__()
        # Use a pretrained model
        self.network = models.resnet101(weights=ResNet101_Weights.DEFAULT)
        # Replace last layer
        num_ftrs = self.network.fc.in_features
        #self.network.fc = nn.Linear(num_ftrs, 37)

        self.network.fc = nn.Sequential(
                          nn.Dropout(0.5),
                          nn.Linear(num_ftrs, 37)
                      )

        # Freeze the early layers
        for name, param in self.network.named_parameters():
            if 'layer1' in name or 'layer2' in name or 'layer3' in name:
                param.requires_grad = False

                # Apply the batch normalization adjustment
                if isinstance(param, nn.BatchNorm2d):
                    set_bn_eval(param)

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

    def predict(self, images):
      return super().predict(images)

In [None]:

def fit(model, epochs, lr, X_train, Y_train, X_val, Y_val, batch_size=32, opt_func=torch.optim.SGD):
    history = []
    # optimizer = opt_func(model.parameters(), lr)

    # Filter out the parameters to update to ensure that "Only Unfrozen Parameters Are Passed to the Optimizer"
    params_to_update = [p for p in model.parameters() if p.requires_grad]
    optimizer = opt_func(params_to_update, lr)

    scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True)


    # Create TensorDataset and DataLoader for training data
    train_dataset = TensorDataset(X_train, Y_train)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) #, num_workers=4

    # Move the model to the GPU
    model = model.to(device)
    #print("MODEL TO DEVICE: ", device)

    for epoch in range(epochs):
        print(f"Epoch: {epoch}")

        # Training Phase
        model.train()
        train_losses = []
        for i, batch in enumerate(train_loader):
            X_batch, Y_batch = batch
            #X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)  # Move batch data to the GPU

            loss = model.training_step([X_batch, Y_batch])
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        stacked_tensors = torch.stack(train_losses)
        # Sum along the first dimension (the new dimension)
        total = torch.sum(stacked_tensors, dim=0)
        print(f"Epoch {epoch}, result: {total}")

        # Validation phase
        result = evaluate(model, X_val, Y_val, batch_size)
        print(f"Epoch {epoch}, result: {result}")
        history.append(result)

        torch.cuda.empty_cache()


    return history

In [None]:
model_resnet_101_one_teacher = DogsCatsCnnModelResNet101()
num_epochs = 20
opt_func = torch.optim.Adam
batch_size = 16
lr = 0.001
fit(model_resnet_101_one_teacher, num_epochs, lr, X_train_one, Y_train_one, X_val, Y_val, batch_size, opt_func)

evaluate_teacher_one = evaluate(model_resnet_101_one_teacher, X_test, Y_test)
print("Evaluation of model trained on the 1% of dataset", evaluate_teacher_one)

torch.cuda.empty_cache()


In [None]:

# Create TensorDataset and DataLoader for training data
dataset = TensorDataset(X_unlabeled_one, Y_unlabeled_one)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) #, num_workers=4

predictions = []
with torch.no_grad():
  for i, batch in enumerate(data_loader):
        images, _ = batch
        predictions.append(model_resnet_101_one_teacher.predict(images))
#print(predictions)

flattened_predictions = [prediction.flatten() for prediction in predictions]
Y_unlabeled_one_predictions = torch.cat(flattened_predictions)
print(len(Y_unlabeled_one_predictions))
print(len(Y_unlabeled_one))

Y_unlabeled_one = Y_unlabeled_one.to(device)
#print(predictions_flat[1])
#print(Y_unlabeled_half[1])
acc = (Y_unlabeled_one_predictions == Y_unlabeled_one).sum().item() / len(Y_unlabeled_one)
print("Accuracy: ", acc)
# Concatenate the dataset with the pseudo labels so that we can train the student model in the entire dataset

X_train_one_concatenated =  torch.cat((X_train_one, X_unlabeled_one), dim=0)
Y_train_one_concatenated = torch.cat((Y_train_one.to(device), Y_unlabeled_one_predictions), dim=0)

model_resnet_101_one_student = DogsCatsCnnModelResNet101()
num_epochs = 10
opt_func = torch.optim.Adam
batch_size = 256
lr = 0.001
fit(model_resnet_101_one_student, num_epochs, lr, X_train_one_concatenated, Y_train_one_concatenated, X_val, Y_val, batch_size, opt_func)

evaluate_student_one= evaluate(model_resnet_101_one_student, X_test, Y_test)
print("Evaluation of model trained on the 1% of dataset", evaluate_student_one)