<a href="https://colab.research.google.com/github/AK18k/ex3/blob/main/Ex3_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/AK18k/ex3

import os

#from google.colab import drive

drive.mount('/content/drive')
DATA_PATH = '/content/drive/MyDrive/Ex3/data'
PATH = '/content/drive/MyDrive/Ex3'
os.chdir('/content/drive/MyDrive/Ex3')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.autograd import Variable
from torch.nn.modules.activation import Softplus
from torch.utils.data import Dataset, DataLoader


# Define hyperparameters
input_size = 28 * 28  # Size of the input images (28x28 pixels)
latent_size = 50  # Length of the latent vector
VAE_batch_size = 64
SVM_batch_size = 64
VAE_epochs = 10
SVM_epochs = 10
learning_rate = 1e-3
num_hidden_units = 600
num_of_labeled_samples = 100


# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

torch.manual_seed(42)

In [None]:
# Load FashionMNIST dataset
transform = transforms.ToTensor()
train_dataset = datasets.FashionMNIST(root=DATA_PATH, train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.FashionMNIST(root=DATA_PATH, train=False, transform=transforms.ToTensor(), download=True)

In [None]:
##############################################################
# Split a dataset to labled and unlabled datasets
# Inputs:
#   - the dataset
#   - number of labled samples required from the dataset
# Output:
#   - labled and unlabled datasets (not dataloaders)
##############################################################

# Define custom dataset for labeled and unlabeled data
class SplitDataset(Dataset):
    def __init__(self, dataset, labeled_indices):
        self.dataset = dataset
        self.labeled_indices = labeled_indices

    def __getitem__(self, index):
        img, target = self.dataset[self.labeled_indices[index]]
        return img, target

    def __len__(self):
        return len(self.labeled_indices)


def split_to_labled(train_dataset, num_of_labeled_samples)
  # Determine the number of labeled samples per class
  num_classes = len(train_dataset.classes)
  labeled_samples_per_class = num_of_labeled_samples // num_classes  # N is the desired number of labeled samples

  # Split the dataset into labeled and unlabeled data
  labeled_indices = []
  unlabeled_indices = []
  class_counts = [0] * num_classes

  for i, (image, label) in enumerate(train_dataset):
      if class_counts[label] < labeled_samples_per_class:
          labeled_indices.append(i)
          class_counts[label] += 1
      else:
          unlabeled_indices.append(i)

  # Create labeled and unlabeled datasets
  labeled_dataset = SplitDataset(train_dataset, labeled_indices)
  unlabeled_dataset = torch.utils.data.Subset(train_dataset, unlabeled_indices)

  # Print the number of labeled and unlabeled samples
  print('Dataset split:')
  print('--------------')
  print(f"Number of labeled samples: {len(labeled_dataset)}")
  print(f"Number of unlabeled samples: {len(unlabeled_dataset)}")

  return labeled_dataset, unlabeled_dataset
  '''
  # Example usage: Creating data loaders
  labeled_batch_size = 64
  unlabeled_batch_size = 128

  labeled_loader = DataLoader(labeled_dataset, batch_size=labeled_batch_size, shuffle=True)
  unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=unlabeled_batch_size, shuffle=True)


  '''


In [None]:
#####################################################################
# Create and train the VAE model
# Input:
#   - data_loader - a dataloader with images and labels
# Output:
#   - the VAE model
#####################################################################


# Define the VAE architecture
class VAE(nn.Module):
    def __init__(self, input_size, latent_size):
        super(VAE, self).__init__()

        # Encoder layers
        self.encoder = nn.Sequential(
            nn.Linear(input_size, num_hidden_units),
            nn.Softplus(),
            nn.Linear(num_hidden_units, latent_size * 2)  # Output mu and logvar for each latent dimension
        )

        # Decoder layers
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, num_hidden_units),
            nn.Softplus(),
            nn.Linear(num_hidden_units, input_size),
            nn.Sigmoid()  # Output values between 0 and 1
        )

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def forward(self, x):
        # Encoder
        encoded = self.encoder(x)
        mu, logvar = torch.chunk(encoded, 2, dim=1)
        z = self.reparameterize(mu, logvar)

        # Decoder
        reconstructed = self.decoder(z)
        return reconstructed, mu, logvar


def train_VAE(data_loader):
  # Create VAE model
  VAE_model = VAE(input_size, latent_size).to(device)

  # Define loss function
  criterion = nn.BCELoss(reduction='sum')  # Binary cross-entropy loss

  # Define optimizer
  optimizer = optim.Adam(VAE_model.parameters(), lr=learning_rate)

  # Training loop
  for epoch in range(VAE_epochs):
      for i, (images, _) in enumerate(data_loader):
          # Flatten input images
          images = images.view(images.size(0), -1).to(device)

          # Forward pass
          reconstructed, mu, logvar = VAE_model(images)

          # Compute reconstruction loss and KL divergence
          reconstruction_loss = criterion(reconstructed, images)
          kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

          # Total loss
          loss = reconstruction_loss + kl_divergence

          # Backward and optimize
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          if (i+1) % 100 == 0:
              print(f"VAE train Epoch [{epoch+1}/{VAE_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

  return VAE_model



In [None]:
###############################################################
# Passes a dataset of images through a pretrained VAE model
# Inputs:
#   - VAE_model
#   - data - the dataset (not a dataloader)
# Output:
#   - output_vectors - latent vectors
###############################################################

def images_to_latent(VAE_model, data)
  data_loader = DataLoader(data, batch_size=VAE_batch_size, shuffle=False)

    # Set the model to evaluation mode
  VAE_model.eval()

  # Create an empty list to store the output vectors
  output_vectors = []

  # Pass the dataset through the VAE model
  with torch.no_grad():
      for images, _ in data_loader:
          images = images.to(device)
          # Obtain the output vectors from the VAE model
          _, _, z = VAE_model(images)
          output_vectors.append(z)

  # Concatenate the output vectors into a single tensor
  output_vectors = torch.cat(output_vectors, dim=0)

  return output_vectors

In [None]:
######################################################
# Create, train and test SVM model
# train_SVM: trains the SVM model.
#           if TSVM is required - input a split dataset using split_to_labled(train_dataset, num_of_labeled_samples)
# Input:
#  - train_dataset (not dataloader)
# Output:
#  - SVM_model
#
# eval_SVM: evaluates on test dataset
# - Input: SVM_model, test_dataset (not dataloader)
# - Output: accuracy
###############################################################################




# Define the SVM model
class SVM(nn.Module):
    def __init__(self, input_size):
        super(SVM, self).__init__()
        self.linear = nn.Linear(input_size, 10)  # 10 classes for FashionMNIST

    def forward(self, x):
        out = self.linear(x)
        return out


def train_SVM(train_dataset):

  # Prepare dataloader
  Y_train = train_dataset.targets
  train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=SVM_batch_size, shuffle=True)

  # Create the SVM model
  SVM_model = SVM(latent_size).to(device)
  SVM_model.train()

  # Define the loss function
  criterion = nn.CrossEntropyLoss()

  # Define the optimizer
  optimizer = optim.SGD(SVM_model.parameters(), lr=learning_rate)

  # Training loop
  for epoch in range(SVM_epochs):
      for i, (images, _) in enumerate(train_loader):
          # Flatten input images
          images = images.view(images.size(0), -1).to(device)

          # Forward pass
          outputs = SVM_model(images)
          loss = criterion(outputs, Y_train[SVM_batch_size*i:SVM_batch_size*(i+1)].to(device))

          # Backward and optimize
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          # Print training progress
          if (i+1) % 100 == 0:
              print(f"SVM Train Epoch [{epoch+1}/{SVM_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
  return SVM_model

def eval_SVM(SVM_model, test_dataset):

  Y_test = test_dataset.targets
  test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=SVM_batch_size, shuffle=False)

  SVM_model.eval()
  with torch.no_grad():
      correct = 0
      total = 0
      for images, labels in test_loader:
          images = images.view(images.size(0), -1).to(device)
          outputs = SVM_model(images)
          _, predicted = torch.max(outputs.data, 1)
          total += labels.size(0)
          correct += (predicted.cpu() == labels).sum().item()

      accuracy = correct / total
      print(f"SVM Test Accuracy: {accuracy:.4f}"
  return accuracy

