In [323]:
import torch.nn as nn

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # encoder
        self.input = nn.Linear(784,512)
        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()
        
        self.fc1 = nn.Linear(512,128)
        self.fc2 = nn.Linear(128,32)
        self.fc3 = nn.Linear(32,10)
        
        # decoder
        self.up_fc1 = nn.Linear(10,32)
        self.up_fc2 = nn.Linear(32,128)
        self.up_fc3 = nn.Linear(128,512)
        self.up_fc4 = nn.Linear(512,784)
        self.sigmoid = nn.Sigmoid()

        
    def encoder(self, x):
        x = self.input(x)
        x = self.relu(x)
        x = self.tanh(x) 
        x = self.fc1(x)
        x = self.relu(x)
        x = self.tanh(x) 
        x = self.fc2(x)
        x = self.tanh(x) 
        x = self.fc3(x)
        return x
        
    def decoder(self, x):
        x = self.up_fc1(x)
        x = self.tanh(x)
        x = self.up_fc2(x)
        x = self.tanh(x)
        x = self.up_fc3(x)
        x = self.tanh(x)
        x = self.up_fc4(x)
        x = self.sigmoid(x)
        return x
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [333]:
def custom_loss(outputs, inputs,pred,truth , model_parameters):
    inputs = inputs.to('cuda:0')
    outputs = outputs.to('cuda:0')
    pred = pred.to('cuda:0')
    truth = truth.to('cuda:0')
    
    # Find indices where truth is equal to pred
    matching_indices = torch.nonzero(truth == pred)
    non_matching = torch.nonzero(truth != pred)

    same_labels =  torch.sum(torch.exp(- ((outputs[matching_indices] - inputs[matching_indices])).pow(2)))   / ((truth == pred).sum().item()+1)
    dif_labels =   torch.sum((1 - torch.exp(- ((outputs[non_matching] - inputs[non_matching])).pow(2))))  / ((truth != pred).sum().item()+1)
    weight_term =   same_labels * dif_labels
    cmse_loss =  (weight_term * ((pred - truth) ** 2)).mean()
    l2_penalty = 0.00001  * sum([(p**2).sum() for p in model_parameters])
    loss = cmse_loss + l2_penalty
    return loss

In [325]:
import torch.optim as optim

autoencoder = Autoencoder()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.003)

In [326]:
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=12, shuffle=True)

In [327]:
import torch

class KMeans:
    def __init__(self, n_clusters, max_iter=100):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.centroids = None

    def fit(self, X):
        initial_indices = torch.randperm(X.size(0))[:self.n_clusters]
        self.centroids = X[initial_indices]

        for _ in range(self.max_iter):
            labels = self._assign_clusters(X)
            new_centroids = self._update_centroids(X, labels)
            if torch.all(torch.eq(self.centroids, new_centroids)):
                break

            self.centroids = new_centroids

    def _assign_clusters(self, X):
        distances = torch.norm(X.unsqueeze(1) - self.centroids, dim=2)
        return torch.argmin(distances, dim=1)

    def _update_centroids(self, X, labels):
        new_centroids = torch.zeros((self.n_clusters, X.size(1)), device=X.device)
        for i in range(self.n_clusters):
            new_centroids[i] = X[labels == i].mean(dim=0)
        return new_centroids

In [328]:
mnist_data = []
targets = []
for data in train_loader:
    inputs, target = data
    inputs = inputs.view(inputs.size(0), -1)
    targets.append(target)
    mnist_data.append(inputs)

mnist_data = torch.cat(mnist_data, dim=0)
targets = torch.cat(targets, dim=0)


In [329]:
# torch.manual_seed(42)

# kmeans = KMeans(n_clusters=10)
# kmeans.fit(mnist_data)

# cluster_assignments = kmeans._assign_clusters(mnist_data)
# final_centroids = kmeans.centroids


In [330]:
# cluster_assignments.shape

In [331]:
# from sklearn.metrics import adjusted_rand_score

# ari = adjusted_rand_score(cluster_assignments.numpy(), targets.numpy()) * 100

# print("Adjusted Rand Index:", f'{ari:.3f}%')

In [334]:
from torch.optim.lr_scheduler import StepLR

num_epochs = 7
autoencoder.to("cuda")

lr_scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

for epoch in range(num_epochs):
    running_loss = 0.0

    for data in train_loader:
        inputs, truth = data
        inputs = inputs.view(-1, 784).to("cuda")
        optimizer.zero_grad()
        outputs = autoencoder(inputs)
        pred  = autoencoder.encoder(inputs)
        loss = custom_loss(outputs, inputs,pred.argmax(dim=1),truth, autoencoder.parameters())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
    lr_scheduler.step() 
    print(f"Epoch {epoch+1}, Loss:{running_loss / (len(train_loader) / 12 )}")

Epoch 1, Loss:1534700.2554440196
Epoch 2, Loss:915964.2574828997
Epoch 3, Loss:876422.5500331289
Epoch 4, Loss:1028448.3320081574
Epoch 5, Loss:907792.7722969545
Epoch 6, Loss:838096.2497204529
Epoch 7, Loss:813265.9992728679


In [None]:
# for data in train_loader:
#         inputs, truth = data
#         inputs = inputs.view(-1, 784).to("cuda")
#         optimizer.zero_grad()
#         outputs = autoencoder(inputs)
#         pred  = autoencoder.encoder(inputs)
#         break
        
# inputs = inputs.to('cuda:0')
# outputs = outputs.to('cuda:0')
# pred = pred.to('cuda:0')
# truth = truth.to('cuda:0')
# pred = pred.argmax(dim=1)

In [None]:
# loss = 0
# same_labels =  torch.sum(torch.exp(- ((outputs - inputs)).pow(2)))   / ((truth == pred).sum().item()+1)
# dif_labels =   torch.sum((1 - torch.exp(- ((outputs - inputs)).pow(2))))  / ((truth != pred).sum().item()+1)
# weight_term =   same_labels / dif_labels
# cmse_loss =  (weight_term * ((pred - truth) ** 2)).mean()
# l2_penalty = 0.00001  * sum([(p**2).sum() for p in autoencoder.parameters()])
# loss = cmse_loss + l2_penalty
# loss

In [None]:
# inputs = inputs.to('cuda:0')
# outputs = outputs.to('cuda:0')
# pred = pred.to('cuda:0')
# truth = truth.to('cuda:0')
# pred = pred.argmax(dim=1)
# outputs
# same_labels =  torch.exp(- ((outputs - inputs)).pow(2))  / (truth == pred).sum().item()
# print(torch.exp(- ((outputs - inputs)).pow(2)))
# print((truth == pred).sum().item()+1)

# dif_labels =   torch.sum(1 - torch.exp(- ((outputs - inputs)).pow(2)))   / (truth != pred).sum().item()
# weight_term = dif_labels * same_labels
# cmse_loss = torch.mean(weight_term * ((pred - truth) ** 2))
# print(cmse_loss)
# l2_penalty = 0.00001  * sum([(p**2).sum() for p in model_parameters])
# loss = cmse_loss + l2_penalty

In [None]:
mnist_data_encoded = []

for data in train_loader:
    inputs, _ = data
    
    inputs = inputs.view(-1, 784).to("cuda")
    output = autoencoder.encoder(inputs)
    output = output.view(output.size(0), -1)  
    mnist_data_encoded.append(output)

mnist_data_encoded = torch.cat(mnist_data_encoded, dim=0)
mnist_data_encoded.argmax(dim=1).shape , targets.shape

In [None]:
mnist_data_encoded.argmax(dim=1)[0], targets[0]

In [None]:
torch.manual_seed(42)

kmeans_encoded = KMeans(n_clusters=10)
kmeans_encoded.fit(mnist_data_encoded)

cluster_assignments_encoded = kmeans_encoded._assign_clusters(mnist_data_encoded)
final_centroids_encoded = kmeans_encoded.centroids


In [None]:
from sklearn.metrics import adjusted_rand_score

ari = adjusted_rand_score(cluster_assignments_encoded.cpu(), targets.numpy()) * 100

print("Adjusted Rand Index:", f'{ari:.3f}%')