<a href="https://colab.research.google.com/github/SauryanPandey/KCDH-Projects/blob/main/tsne_logdet_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
SKLEARN_ALLOW_DEPRECATED_SKLEARN_PACKAGE_INSTALL=True

In [None]:
!pip install -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ submodlib

In [None]:
import torch
from torch import nn
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import math
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
image_path = "/kaggle/input/pneumoniamnistdata/train_images (PneumonialMNIST).npy"            #Replace this with any dataset
label_path = "/kaggle/input/pneumoniamnistdata/train_labels (PneumonialMNIST).npy"
y = np.load(label_path)
y = torch.from_numpy(y).to(device)
print(y.shape)

In [None]:
x = np.load(image_path)
x = np.transpose(x, (0, 3, 1, 2))
x = torch.from_numpy(x).to(device)
print(x.shape)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=1)

train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)

batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
for data in train_loader:
    print(data)
    break

In [None]:
from torchvision.models.resnet import ResNet18_Weights
resnet = models.resnet18(weights=ResNet18_Weights.DEFAULT)
resnet = torch.nn.Sequential(*list(resnet.children())[:-1])
resnet = resnet.to(device)

In [None]:
def extract_features(images):
    features = []
    for batch in images:
        with torch.no_grad():
            batch = batch[0].to(device)

            if batch.dtype == torch.uint8:   # Convert the input tensor to torch.cuda.FloatTensor if it's of type torch.cuda.ByteTensor
                batch = batch.float()

            features_batch = resnet(batch)
            features_batch = features_batch.view(features_batch.size(0), -1)
            features.append(features_batch)

    features = torch.cat(features, dim=0)
    return features

In [None]:
x_train_features = extract_features(train_loader)
print(x_train_features.shape)

In [None]:
num = x_train_features.shape[0] - int(0.7*x_train_features.shape[0])
random_points = []
i = 0
while (i < num):
    random_number = np.random.randint(0, x_train_features.shape[0])
    if(random_number not in random_points):
        random_points.append(random_number)
        i += 1
print(random_points)

In [None]:
def points_selector(points):

    points_selector = []
    _1perc = int(0.01*x_train_features.shape[0])
    points_selector.append(points[:_1perc])
    _5perc = 5*_1perc
    perc_cnt = 0
    i = 1
    while(perc_cnt != 30):
        perc = i*_5perc
        points_selector.append(points[:perc])
        i+=1
        perc_cnt+=5

    return points_selector

In [None]:
random_points_selector = points_selector(random_points)
len(random_points_selector)

In [None]:
from submodlib import LogDeterminantFunction

lambda_val = 1.0
num_desired_datapoints = num

objLogDet = LogDeterminantFunction(n=x_train_features.shape[0], data=x_train_features.cpu().numpy(), mode="dense",
                                     metric="euclidean", lambdaVal=lambda_val)

greedyList = objLogDet.maximize(budget=num_desired_datapoints, optimizer='NaiveGreedy', stopIfZeroGain=False, stopIfNegativeGain=False, verbose=False)

selected_points = []
for i in range(len(greedyList)):
    selected_points.append(greedyList[i][0])

print(selected_points)

In [None]:
selected_points_selector = points_selector(selected_points)

In [None]:
tsne = TSNE(n_components=2, random_state=12, n_iter=2500)
x_train_tsne = tsne.fit_transform(x_train_features.cpu().numpy())
print(x_train_tsne.shape)

plt.scatter(x_train_tsne[:, 0], x_train_tsne[:, 1], c=y_train.cpu().numpy())
plt.title("t-SNE Visualization (Pneumonia MNIST Dataset)")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.show()

In [None]:
print(x_train_tsne)

In [None]:
objLogDet_tsne = LogDeterminantFunction(n=x_train_features.shape[0], data=x_train_tsne, mode="dense",
                                     metric="euclidean", lambdaVal=lambda_val)

greedyList_tsne = objLogDet_tsne.maximize(budget=num_desired_datapoints, optimizer='NaiveGreedy', stopIfZeroGain=False, stopIfNegativeGain=False, verbose=False)

tsne_selected_points = []
for i in range(len(greedyList_tsne)):
    tsne_selected_points.append(greedyList_tsne[i][0])

print(tsne_selected_points)

In [None]:
tsne_selected_points_selector = points_selector(tsne_selected_points)

In [None]:
def create_train_loader_with_imbalance(selected_data_points, x_train, y_train, batch_size=8):
    selected_x_train = []
    selected_y_train = []
    cnt_0 = 0
    cnt_1 = 0

    for i in range(len(x_train)):
        if i in selected_data_points:
            selected_x_train.append(x_train[i])
            selected_y_train.append(y_train[i])
            if y_train[i] == 0:
                cnt_0 += 1
            else:
                cnt_1 += 1

    selected_x_train = torch.stack(selected_x_train)
    selected_y_train = torch.tensor(selected_y_train)
    print(selected_x_train.shape)

    total = cnt_0 + cnt_1
    print(f"Class 0: {round((cnt_0 / total) * 100, 2)}%\nClass 1: {round((cnt_1 / total) * 100, 2)}%")

    selected_train_dataset = TensorDataset(selected_x_train, selected_y_train)
    selected_train_loader = DataLoader(selected_train_dataset, batch_size=batch_size, shuffle=True)

    return selected_train_loader

In [None]:
class HighlightTSNETrainer:
    def __init__(self, num_epochs=300, batch_size=8, lr=0.001, weight_decay=0.1):
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.lr = lr
        self.weight_decay = weight_decay
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.resnet = models.resnet18(weights=ResNet18_Weights.DEFAULT)
        self.num_input_features = self.resnet.fc.in_features
        self.fc_removed_resnet = self._remove_final_fc_layer()
        self.pytorch_classifier = self._build_classifier()
        self.loss_fn = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.pytorch_classifier.parameters(), lr=self.lr, weight_decay=self.weight_decay)

    def _remove_final_fc_layer(self):
        fc_removed_resnet = nn.Sequential(*list(self.resnet.children())[:-1])
        return fc_removed_resnet.to(self.device)

    def _build_classifier(self):
        for param in self.fc_removed_resnet.parameters():
            param.requires_grad = False
        classifier = nn.Sequential(
            nn.Linear(self.num_input_features, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 2)
        )
        return classifier.to(self.device)

    def train(self, dataloader):
        self.pytorch_classifier.train()
        for epoch in range(self.num_epochs):
            for inputs, labels in dataloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()

                # Convert inputs to float32 if they are of type ByteTensor
                if inputs.dtype == torch.uint8:
                    inputs = inputs.float()

                with torch.no_grad():
                    features = self.fc_removed_resnet(inputs)
                outputs = self.pytorch_classifier(features.view(features.size(0), -1))
                loss = self.loss_fn(outputs, labels)
                loss.backward()
                self.optimizer.step()

    def evaluate(self, test_loader):
        self.pytorch_classifier.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)

                # Convert inputs to float32 if they are of type ByteTensor
                if inputs.dtype == torch.uint8:
                    inputs = inputs.float()

                with torch.no_grad():
                    features = self.fc_removed_resnet(inputs)
                outputs = self.pytorch_classifier(features.view(features.size(0), -1))
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            accuracy = 100 * correct / total
            return accuracy


    def plot_tsne_with_highlight(self, tsne_data, highlight_indices):
        plt.figure(figsize=(8, 6))
        sns.scatterplot(x=tsne_data[:, 0], y=tsne_data[:, 1], color='blue', s=50)

        highlight_data = tsne_data[highlight_indices]
        sns.scatterplot(x=highlight_data[:, 0], y=highlight_data[:, 1], color='red', s=50)

        plt.title('Plot with Selected Data Points (Aptos Dataset)')
        plt.xlabel('t-SNE Dimension 1')
        plt.ylabel('t-SNE Dimension 2')
        plt.show()

    def run(self, highlighted_indices, train_loader, test_loader, x_train_tsne):

        self.pytorch_classifier = self._build_classifier()
        self.optimizer = torch.optim.Adam(self.pytorch_classifier.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        self.train(train_loader)

        test_accuracy = self.evaluate(test_loader)
        print(f"Test Accuracy with Highlighted Indices: {test_accuracy:.2f}%")
        return test_accuracy

In [None]:
selected_data_points_x = []
for i in range(len(random_points_selector)):

    selected_data_points_x.append(random_points_selector[i])
    selected_data_points_x.append(selected_points_selector[i])
    selected_data_points_x.append(tsne_selected_points_selector[i])

In [None]:
highlight_trainer = HighlightTSNETrainer(num_epochs=150, batch_size=8, lr=0.001, weight_decay=0.1)
points = [
    "1% Random Selected Points",
    "1% Logdet Selected Points",
    "1% Tsne+Logdet Selected Points",
    "5% Random Selected Points",
    "5% Logdet Selected Points",
    "5% Tsne+Logdet Selected Points",
    "10% Random Selected Points",
    "10% Logdet Selected Points",
    "10% Tsne+Logdet Selected Points",
    "15% Random Selected Points",
    "15% Logdet Selected Points",
    "15% Tsne+Logdet Selected Points",
    "20% Random Selected Points",
    "20% Logdet Selected Points",
    "20% Tsne+Logdet Selected Points",
    "25% Random Selected Points",
    "25% Logdet Selected Points",
    "25% Tsne+Logdet Selected Points",
    "30% Random Selected Points",
    "30% Logdet Selected Points",
    "30% Tsne+Logdet Selected Points"
]
random_points_mean = []
random_points_std = []
selected_points_mean = []
selected_points_std = []
tsne_selected_points_mean = []
tsne_selected_points_std = []

for i in range(len(points)):
    print(points[i])
    selected_data_points = selected_data_points_x[i]
    selected_train_loader = create_train_loader_with_imbalance(selected_data_points, x_train, y_train, batch_size=8)
    epochs = 15
    acc_list = []
    for _ in range(epochs):
        acc = highlight_trainer.run(selected_data_points, selected_train_loader, test_loader, x_train_tsne)
        acc_list.append(acc)

    mean_acc = np.mean(acc_list)
    std_acc = np.std(acc_list)
    std_err = std_acc/math.sqrt(epochs)
    print(f"Mean Accuracy : {mean_acc}\nStandard Error : {std_err}\n")
    highlight_trainer.plot_tsne_with_highlight(x_train_tsne, selected_data_points)

    if i % 3 == 0:
        random_points_mean.append(mean_acc)
        random_points_std.append(std_err)
    elif i % 3 == 1:
        selected_points_mean.append(mean_acc)
        selected_points_std.append(std_err)
    else:
        tsne_selected_points_mean.append(mean_acc)
        tsne_selected_points_std.append(std_err)

    print("\n--------------------------------------------------------------------------------------------------\n")

In [None]:
all_points = list(np.arange(0, x_train_features.shape[0], 1))

In [None]:
selected_data_points = all_points
selected_train_loader = create_train_loader_with_imbalance(selected_data_points, x_train, y_train, batch_size=8)

In [None]:
skyline_acc_list = []
for _ in range(epochs):
    acc = highlight_trainer.run(selected_data_points, selected_train_loader, test_loader, x_train_tsne)
    skyline_acc_list.append(acc)

skyline_mean_acc = np.mean(skyline_acc_list)
skyline_std_acc = np.std(skyline_acc_list)
skyline_std_err = skyline_std_acc/math.sqrt(epochs)
print(f"Skyline Mean Accuracy : {skyline_mean_acc}\nStandard Error : {skyline_std_err}\n")

In [None]:
percentages = [1, 5, 10, 15, 20, 25, 30]

plt.figure(figsize=(10, 6))
plt.plot(percentages, random_points_mean, marker='o', label='Randomly Selected Points', linestyle='-', linewidth=2, color='tab:blue')
plt.fill_between(percentages, np.array(random_points_mean) - np.array(random_points_std), np.array(random_points_mean) + np.array(random_points_std), color='tab:blue', alpha=0.2)

plt.plot(percentages, selected_points_mean, marker='o', label='Logdet Selected Points', linestyle='-', linewidth=2, color='tab:orange')
plt.fill_between(percentages, np.array(selected_points_mean) - np.array(selected_points_std), np.array(selected_points_mean) + np.array(selected_points_std), color='tab:orange', alpha=0.2)

plt.plot(percentages, tsne_selected_points_mean, marker='o', label='t-SNE + LogDet Selected Points', linestyle='-', linewidth=2, color='tab:green')
plt.fill_between(percentages, np.array(tsne_selected_points_mean) - np.array(tsne_selected_points_std), np.array(tsne_selected_points_mean) + np.array(tsne_selected_points_std), color='tab:green', alpha=0.2)

plt.plot(percentages, [skyline_mean_acc] * len(percentages), color='tab:purple', label='Skyline Accuracy', linestyle='--', linewidth=2)
plt.fill_between(percentages, np.array(skyline_mean_acc) - np.array(skyline_std_err), np.array(skyline_mean_acc) + np.array(skyline_std_err), color='tab:purple', alpha=0.2)

plt.title('Test Accuracy vs. Percentage of Selected Data Points (Pneumonia MNIST Dataset)')
plt.xlabel('Percentage of Selected Data Points')
plt.ylabel('Test Accuracy')
plt.xticks(percentages)
plt.legend(loc='lower right')
plt.grid(True)
plt.show()