In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import pandas as pd

## Prepare Dataset

In [None]:
data = pd.read_excel("data/synthetic_dataset.xlsx", index_col=0)
data_tensor = torch.tensor(data.values, dtype=torch.float32)


train_tensor, val_tensor = train_test_split(data_tensor, test_size=0.2, random_state=42)

train_dataset = TensorDataset(train_tensor)
val_dataset = TensorDataset(val_tensor)

train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=128, shuffle=False)

## AutoEncoder

In [94]:
class EnhancedLR(nn.Module):
    
    def __init__(self, input_dim):
        super(EnhancedLR , self).__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        x = self.linear(x)
        return torch.sigmoid(x)

    def predict_proba(self, x):
        with torch.no_grad():
            output = self.forward(x)
            return torch.cat((1 - output, output), dim=1).numpy()


In [95]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, reconstruction_loss, classification_loss, dropout_rate = 0.15):
        super(Autoencoder, self).__init__()

        self.input_dim = input_dim
        self.reconstruction_loss = reconstruction_loss
        self.classification_loss = classification_loss
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.encoder = nn.Sequential(
                nn.Linear(input_dim, 1024),
                nn.ReLU(),
                nn.Dropout(dropout_rate),
                nn.Linear(1024, embedding_dim),
            )

        self.decoder = nn.Sequential(
                nn.Linear(embedding_dim, 1024),
                nn.ReLU(),
                nn.Linear(1024, input_dim),
                nn.Sigmoid(),
            )
    

        self.classifier = EnhancedLR(embedding_dim)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    
    def predict(self, x):
        x = self.encoder(x)
        prediction = self.classifier(x)
        return prediction
    
    def predict_proba(self, x):
        x = self.encoder(x)
        predict_proba = self.classifier.predict_proba(x)
        return predict_proba
    
    def unsupervised_train(self, num_epochs, train_dataloader, val_dataloader, optimizer):

        for epoch in range(num_epochs):
            self.train()
            total_train_loss = 0
            
            # Training phase
            for batch in train_dataloader:
                optimizer.zero_grad()
                inputs = batch[0].to(self.device)
                outputs = self(inputs)
                loss = self.reconstruction_loss(outputs, inputs)
                loss.backward()
                optimizer.step()
                total_train_loss += loss.item()
            
            # Validation phase
            self.eval()
            total_val_loss = 0
            with torch.no_grad():
                for batch in val_dataloader:
                    inputs = batch[0].to(self.device)
                    outputs = self(inputs)
                    val_loss = self.reconstruction_loss(outputs, inputs)
                    total_val_loss += val_loss.item()

            avg_train_loss = total_train_loss / len(train_dataloader)
            avg_val_loss = total_val_loss / len(val_dataloader)
            
            print(f'Epoch: {epoch+1}, Training Loss: {avg_train_loss}, Validation Loss: {avg_val_loss}')

    def fit(self, num_epochs, train_dataloader, optimizer):
        
        for epoch in range(num_epochs):
            self.train()
            total_train_loss = 0
            
            # Training phase
            for inputs, labels in train_dataloader:

                inputs, labels = inputs.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                outputs = self(inputs)

                # Fine-tune the embeddings
                reconstruction_loss = self.reconstruction_loss(outputs, inputs)

                # Train the classifier 
                predictions = self.predict(inputs)
                classification_loss = self.classification_loss(predictions, labels)

                # Optimize Both 
                loss = reconstruction_loss + classification_loss
                
                loss.backward()
                optimizer.step()
                total_train_loss += loss.item()

            avg_train_loss = total_train_loss / len(train_dataloader)
            
            print(f'Epoch: {epoch+1}, Training Loss: {avg_train_loss}')

In [97]:
input_dim = data.shape[1]
embedding_dim = 64
reconstruction_loss = nn.BCELoss()
classification_loss = nn.BCELoss()

model = Autoencoder(input_dim, embedding_dim, reconstruction_loss, classification_loss)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [98]:
model.unsupervised_train(10, train_dataloader, val_dataloader, optimizer)

Epoch: 1, Training Loss: 0.07264308820695856, Validation Loss: 0.03219642869512969
Epoch: 2, Training Loss: 0.019506159070750168, Validation Loss: 0.010981921640592158
Epoch: 3, Training Loss: 0.007918536684564024, Validation Loss: 0.005975587797616053
Epoch: 4, Training Loss: 0.004391673621915143, Validation Loss: 0.00415471322843036
Epoch: 5, Training Loss: 0.002817769317648978, Validation Loss: 0.0032134205327206933
Epoch: 6, Training Loss: 0.0019246669204940044, Validation Loss: 0.002817931416265051
Epoch: 7, Training Loss: 0.0014152439625978934, Validation Loss: 0.0025384919803350344
Epoch: 8, Training Loss: 0.0010680316708923917, Validation Loss: 0.0023601396583926375
Epoch: 9, Training Loss: 0.0008591706707926282, Validation Loss: 0.002249100070378376
Epoch: 10, Training Loss: 0.0007006350939520194, Validation Loss: 0.0023757651610590447


In [99]:
torch.save(model,"auto_encoder.pkl")

In [100]:
input_tensor = train_tensor[0].unsqueeze(0)  # Add a batch dimension
model.predict_proba(input_tensor)

array([[0.861526  , 0.13847397]], dtype=float32)

In [101]:
model.train()

Autoencoder(
  (reconstruction_loss): BCELoss()
  (classification_loss): BCELoss()
  (encoder): Sequential(
    (0): Linear(in_features=343, out_features=1024, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.15, inplace=False)
    (3): Linear(in_features=1024, out_features=64, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=64, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=343, bias=True)
    (3): Sigmoid()
  )
  (classifier): EnhancedLR(
    (linear): Linear(in_features=64, out_features=1, bias=True)
  )
)

## Evaluate

In [None]:
model = torch.load("auto_encoder.pkl")

In [None]:
model.eval()

In [None]:
embeddings = model(data_tensor.float()).detach().numpy()

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

embeddings_2d = TSNE(n_components=2, random_state=0).fit_transform(embeddings)

plt.figure(figsize=(10, 8))
plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], cmap='viridis', alpha=0.5)
plt.title('t-SNE plot of the embeddings')
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.show()

In [None]:
from finch import FINCH
import pandas as pd
from sklearn.metrics import silhouette_score

In [None]:
c, num_clust, req_c = FINCH(embeddings)

In [None]:
for i in range(0,6):
    silhouette_avg = silhouette_score(embeddings, pd.DataFrame(c)[i])
    print(f"Silhouette Score: {silhouette_avg}")