# Import Libraries

In [1]:
# import libraries
import os
import pickle
import numpy as np
import pandas as pd
from time import time
from sklearn.model_selection import train_test_split

import gzip
from urllib import request
from tqdm import tqdm

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import TensorDataset, DataLoader, Dataset

import torchvision.transforms as transforms

os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# Load Data

In [2]:
def load_mnist(pickle_path):
    with open(pickle_path, 'rb') as f:
        x_train, x_test, y_train, y_test = pickle.load(f).values()
    return x_train, x_test, y_train, y_test


def load_mnist_semisupervised(pickle_path):
    x_train, x_test, y_train, y_test = load_mnist(pickle_path)

    labels = list(set(y_train))
    print(f"Labels: {labels}")

    data_dict = {}

    data_dict["x_train_labeled"], data_dict["x_train_unlabeled"], data_dict["y_train_labeled"], data_dict["y_train_unlabeled"] = train_test_split(x_train, y_train, train_size=100, random_state=42, stratify=y_train)

    # to verify number of samples for each class
    classes, counts = np.unique(data_dict["y_train_labeled"], return_counts=True)

    data_dict["x_train"] = x_train
    data_dict["y_train"] = y_train
    data_dict["x_test"] = x_test
    data_dict["y_test"] = y_test

    print(f"Labeled Train Shape   : {data_dict['x_train_labeled'].shape}")
    print(f"UnLabeled Train Shape : {data_dict['x_train_unlabeled'].shape}")

    return data_dict

In [3]:
DATASET_DIR = "./data/"
DOWNLOAD_DIR = "./data/tmp/"

BASE_URL = "http://yann.lecun.com/exdb/mnist/"

TARGET_LIST = [
    ("train_images", "train-images-idx3-ubyte.gz"),
    ("test_images", "t10k-images-idx3-ubyte.gz"),
    ("train_labels", "train-labels-idx1-ubyte.gz"),
    ("test_labels", "t10k-labels-idx1-ubyte.gz")
]


def download():
    if not os.path.exists(DOWNLOAD_DIR):
        os.makedirs(DOWNLOAD_DIR)

    for _, file_name in TARGET_LIST:
        print(f"Downloading {DOWNLOAD_DIR + file_name}...")
        request.urlretrieve(BASE_URL + file_name, DOWNLOAD_DIR + file_name)
    print("Download Completed!")


def save_as_picke():
    mnist = {}

    for file_tag, file_name in TARGET_LIST[:2]:
        with gzip.open(DOWNLOAD_DIR + file_name, 'rb') as f:
            mnist[file_tag] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28*28)/255

    for file_tag, file_name in TARGET_LIST[-2:]:
        with gzip.open(DOWNLOAD_DIR + file_name, 'rb') as f:
            mnist[file_tag] = np.frombuffer(f.read(), np.uint8, offset=8)

    if not os.path.exists(DATASET_DIR):
        os.makedirs(DATASET_DIR)

    with open(DATASET_DIR + "mnist.pkl", 'wb') as f:
        pickle.dump(mnist,f)
    print(f"Saving as pickle complete: {DATASET_DIR + 'mnist.pkl'}")

def clean():
    if os.path.exists(DOWNLOAD_DIR):
        for _, file_name in TARGET_LIST:
            if os.path.exists(DOWNLOAD_DIR + file_name):
                os.remove(DOWNLOAD_DIR + file_name)
        os.rmdir(DOWNLOAD_DIR)
        print("Download directory (./tmp) is cleaned and removed.")

download()
save_as_picke()
clean()

Downloading ./data/tmp/train-images-idx3-ubyte.gz...
Downloading ./data/tmp/t10k-images-idx3-ubyte.gz...
Downloading ./data/tmp/train-labels-idx1-ubyte.gz...
Downloading ./data/tmp/t10k-labels-idx1-ubyte.gz...
Download Completed!
Saving as pickle complete: ./data/mnist.pkl
Download directory (./tmp) is cleaned and removed.


In [4]:
mnist_path = "./data/mnist.pkl"

data_dict = load_mnist_semisupervised(mnist_path)

x_train_labeled = data_dict["x_train_labeled"]
x_train_unlabeled = data_dict["x_train_unlabeled"]
y_train_labeled = data_dict["y_train_labeled"]
y_train_unlabeled = data_dict["y_train_unlabeled"]

x_train = data_dict["x_train"]
y_train = data_dict["y_train"]
x_test = data_dict["x_test"]
y_test = data_dict["y_test"]

Labels: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Labeled Train Shape   : (100, 784)
UnLabeled Train Shape : (59900, 784)


# Create Custom Dataset Structure for Pytorch

In [5]:
class MNIST_Dataset(Dataset):
    def __init__(self, data, target, transform=None):
        self.data = data
        self.target = target
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        x = self.data[index]
        if self.transform:
            x = self.transform(x)

        y = self.target[index]

        return x, y

# Define Datasets and Dataloaders

In [6]:
# labeled + unlabeled training = 60000
pretrain_num_epochs = 5
pretrain_batch_size = 128
all_train_dataset = TensorDataset(torch.Tensor(x_train),
                                  torch.zeros(x_train.shape[0]))
all_train_dataloader = DataLoader(all_train_dataset,
                                     batch_size=pretrain_batch_size,
                                     shuffle=True,
                                     pin_memory=True)
#-------------------------
# unlabeled training = 59900
unlabeled_train_dataset = TensorDataset(torch.Tensor(x_train_unlabeled),
                                  torch.zeros(x_train_unlabeled.shape[0]))
unlabeled_train_dataloader = DataLoader(unlabeled_train_dataset,
                                     batch_size=pretrain_batch_size,
                                     shuffle=True,
                                     pin_memory=True)
#-------------------------
# labeled training = 100
train_num_epochs = 10
train_batch_size = 100
train_dataset = TensorDataset(torch.Tensor(x_train),
                              torch.LongTensor(y_train))
train_dataloader = DataLoader(train_dataset,
                                     batch_size=train_batch_size,
                                     shuffle=False,
                                     pin_memory=True)
#-------------------------
# test = 10000
test_batch_size = 128
test_dataset = TensorDataset(torch.Tensor(x_test),
                              torch.LongTensor(y_test))
test_dataloader = DataLoader(test_dataset,
                                     batch_size=test_batch_size,
                                     shuffle=False,
                                     pin_memory=True)

# Define Autoencoder

In [7]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 12),
            nn.ReLU(),
            nn.Linear(12, 3)  # Compressed representation
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(),
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28*28),
            nn.Sigmoid()  # For normalized inputs
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Unsupervised Pretraining of Autoencoder with all Training Data

In [8]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
device

device(type='cuda')

In [9]:
autoencoder = Autoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

In [10]:
# Pretraining loop
autoencoder.train()
for epoch in range(pretrain_num_epochs):
    loss = 0.0
    t0 = time()
    for data, _ in all_train_dataloader:
        inputs = data.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = autoencoder(inputs)
        loss = criterion(outputs, inputs)
        loss.backward()
        optimizer.step()
        loss += loss.item()

    loss = loss / len(all_train_dataloader)
    t1 = time()
    print(f"epoch: {epoch+1}/{pretrain_num_epochs}, loss={loss: .6}, {t1-t0:.2f}s")

epoch: 1/5, loss= 0.000241395, 5.58s
epoch: 2/5, loss= 0.000195024, 2.43s
epoch: 3/5, loss= 0.000169197, 2.42s
epoch: 4/5, loss= 0.000177945, 3.14s
epoch: 5/5, loss= 0.000157331, 2.59s


# Supervised Training using 100 labeled samples

In [11]:
class MLP(nn.Module):
    def __init__(self, pretrained_encoder):
        super(MLP, self).__init__()
        self.encoder = pretrained_encoder
        # Add a classifier on top of the encoder
        self.classifier = nn.Sequential(
            nn.Linear(3, 64),
            nn.ReLU(),
            nn.Linear(64, 10)  # Assuming 10 classes
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.classifier(x)
        return x

In [12]:
# Freeze the encoder part during supervised training
for param in autoencoder.encoder.parameters():
    param.requires_grad = False

In [13]:
supervised_model = MLP(autoencoder.encoder).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(supervised_model.parameters(), lr=0.001)

In [14]:
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()  # Set the model to training mode

    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_samples += labels.size(0)

        # Calculate accuracy
        _, predicted = torch.max(outputs.data, 1)
        correct_predictions += (predicted == labels).sum().item()

    avg_loss = total_loss / len(train_loader)
    accuracy = (correct_predictions / total_samples) * 100

    print(f"training loss: {avg_loss: .2f}, training accuracy: {accuracy: .2f}")

In [15]:
def evaluate_model(model, data_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode

    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():  # No need to track gradients during evaluation
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            total_samples += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

    avg_loss = total_loss / len(data_loader)
    accuracy = (correct_predictions / total_samples) * 100

    print(f"evaluation loss: {avg_loss: .2f}, evaluation accuracy: {accuracy: .2f}")

In [16]:
for i in range(train_num_epochs):
    print("Epoch", i)
    train_model(supervised_model, train_dataloader, criterion, optimizer, device)

evaluate_model(supervised_model, test_dataloader, criterion, device)

Epoch 0
training loss:  1.11, training accuracy:  63.12
Epoch 1
training loss:  0.77, training accuracy:  74.27
Epoch 2
training loss:  0.73, training accuracy:  75.32
Epoch 3
training loss:  0.71, training accuracy:  75.67
Epoch 4
training loss:  0.70, training accuracy:  75.98
Epoch 5
training loss:  0.69, training accuracy:  76.16
Epoch 6
training loss:  0.69, training accuracy:  76.30
Epoch 7
training loss:  0.68, training accuracy:  76.37
Epoch 8
training loss:  0.68, training accuracy:  76.43
Epoch 9
training loss:  0.68, training accuracy:  76.49
evaluation loss:  0.68, evaluation accuracy:  76.34


# Labeling unlabeled Data

In [17]:
data_list = []
labels_list = []

supervised_model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # No need to track gradients during prediction
    for data in unlabeled_train_dataloader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # forward + prediction
        outputs = supervised_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        data_list.append(inputs.cpu().detach())
        labels_list.append(predicted.cpu().detach())

In [18]:
x_unlabeled = torch.vstack(data_list)
y_unlabeled = torch.hstack(labels_list)

# Combining new labeled Data with train Data

In [19]:
# labeled + unlabeled training = 60000
combined_train_num_epochs = 5
combined_batch_size = 128
combined_dataset = TensorDataset(torch.vstack((torch.Tensor(x_train_labeled), x_unlabeled)),
                                  torch.tensor(torch.hstack((torch.Tensor(y_train_labeled), y_unlabeled)), dtype=torch.long))
combined_dataloader = DataLoader(combined_dataset,
                                     batch_size=combined_batch_size,
                                     shuffle=True,
                                     pin_memory=True)

  torch.tensor(torch.hstack((torch.Tensor(y_train_labeled), y_unlabeled)), dtype=torch.long))


In [20]:
for i in range(combined_train_num_epochs):
    print("Epoch", i)
    train_model(supervised_model, combined_dataloader, criterion, optimizer, device)

evaluate_model(supervised_model, test_dataloader, criterion, device)

Epoch 0
training loss:  0.23, training accuracy:  96.50
Epoch 1
training loss:  0.18, training accuracy:  96.53
Epoch 2
training loss:  0.15, training accuracy:  96.91
Epoch 3
training loss:  0.14, training accuracy:  97.02
Epoch 4
training loss:  0.12, training accuracy:  97.18
evaluation loss:  1.29, evaluation accuracy:  75.89
