In [1]:
import os
import json
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
from torch import optim
from torch.optim import Adam
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image, make_grid
from mpl_toolkits.axes_grid1 import ImageGrid
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, f1_score, multilabel_confusion_matrix
from scipy.io import arff

# Set the device for PyTorch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Configure Polars
pl.Config.set_tbl_rows(-1)


polars.config.Config

# VAE implementation

**Source**:
- https://github.com/clementchadebec/benchmark_VAE
- https://github.com/Victarry/Image-Generation-models/blob/main/src/models/vae.py    
- https://github.com/yakhyo/pytorch-tutorials/blob/23d4086486482eb13fae05a87ef93a1e0df8b0ff/tutorials/03-intermediate/05-var-auto-encode/main.py

Others:

- https://github.com/kleinzcy/Variational-AutoEncoder/blob/master/VAE.ipynb
- https://github.com/siddharth17196/Variational-Autoencoders/blob/master/autoencoder.ipynb


In [12]:
# read from directory
tra, trameta = arff.loadarff('../../data/GCM_Training.arff')
tst, tstmeta = arff.loadarff('../../data/GCM_Test.arff')

In [13]:
train = pl.from_numpy(tra).to_numpy()
test = pl.from_numpy(tst).to_numpy()

## Filter lung

In [14]:
lung_train = train[train[:, -1] == b'Lung']
lung_test = test[test[:, -1] == b'Lung']

In [16]:
print(lung_train.shape), print(lung_test.shape)


(8, 16064)
(3, 16064)


(None, None)

In [17]:
train = lung_train
test = lung_test

In [18]:
categories = [binary_str.decode('utf-8') for binary_str in set(train[:,-1])]
# Create a mapping from category to integer
target_mapping = {category.encode(): index for index, category in enumerate(categories)}
target_mapping

{b'Lung': 0}

In [19]:
# Map binary targets to numerical values
numerical_targets_train = np.array([target_mapping.get(x[-1], x) for x in train])
numerical_targets_test = np.array([target_mapping.get(x[-1], x) for x in test])
train[:, -1] = numerical_targets_train
test[:, -1] = numerical_targets_test

In [20]:
train = train[:,:-1]
test = test[:,:-1]
train[0], test[0]


(array([-47.0, -171.0, -284.0, ..., 89.0, -1324.0, -70.0], dtype=object),
 array([-94.0, -256.0, -358.0, ..., 34.0, -1233.0, -47.0], dtype=object))

In [21]:
# Convert the array to float32
train = train.astype(np.float32)
test = test.astype(np.float32)

# Convert to a PyTorch tensor
train = torch.from_numpy(train)
test =  torch.from_numpy(test)

In [22]:
def normalize_data(data):
    min_val = torch.min(data)
    max_val = torch.max(data)
    normalized_data = (data - min_val) / (max_val - min_val)
    return normalized_data

In [23]:
train_norm = normalize_data(train)
test_norm = normalize_data(test)

In [24]:
train_norm[0]
train.shape

torch.Size([8, 16063])

In [37]:
# create train and test dataloaders
batch_size = 4
train_loader = DataLoader(dataset=train_norm, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_norm, batch_size=batch_size, shuffle=False)


# Creating Variational Autoencoders

In [38]:
class VAE(nn.Module):

    def __init__(self, input_dim=16063, hidden_dim=400, latent_dim=200, device=device):
        super(VAE, self).__init__()

        # encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, latent_dim),
            nn.LeakyReLU(0.2)
            )

        # latent mean and variance
        self.mean_layer = nn.Linear(latent_dim, 2)
        self.logvar_layer = nn.Linear(latent_dim, 2)

        # decoder
        self.decoder = nn.Sequential(
            nn.Linear(2, latent_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(latent_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, input_dim),
            nn.Sigmoid()
            )

    def encode(self, x):
        x = self.encoder(x)
        mean, logvar = self.mean_layer(x), self.logvar_layer(x)
        return mean, logvar

    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(device)
        z = mean + var*epsilon
        return z

    def decode(self, x):
        return self.decoder(x)

    def forward(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterization(mean, logvar)
        x_hat = self.decode(z)
        return x_hat, mean, log_var

    def forward(self, x):
        mean, log_var = self.encode(x)
        z = self.reparameterization(mean, torch.exp(0.5 * log_var))
        x_hat = self.decode(z)
        return x_hat, mean, log_var

In [27]:
model = VAE(input_dim=16063).to(device)
optimizer = Adam(model.parameters(), lr=1e-3)

In [39]:
def loss_function(x, x_hat, mean, log_var):
    reproduction_loss = nn.functional.binary_cross_entropy(x_hat, x, reduction='sum')
    KLD = - 0.5 * torch.sum(1+ log_var - mean.pow(2) - log_var.exp())
    return reproduction_loss + KLD

In [40]:
def trainVAE(model, optimizer, epochs, device, x_dim=16063):
    model.train()
    for epoch in range(epochs):
        overall_loss = 0
        for batch_idx, x in enumerate(train_loader):
            
            x = x.view(x.size(0), x_dim).to(device)

            optimizer.zero_grad()

            x_hat, mean, log_var = model(x)
            loss = loss_function(x, x_hat, mean, log_var)
            
            overall_loss += loss.item()
            
            loss.backward()
            optimizer.step()

        print("\tEpoch", epoch + 1, "\tAverage Loss: ", overall_loss/(batch_idx*batch_size))
    return overall_loss

In [41]:
# Get the first batch of data
first_batch = next(iter(train_loader))

# Print the type and length/shape of the first batch
print(type(first_batch))
if isinstance(first_batch, (list, tuple)):
    print([type(item) for item in first_batch])
    print([item.shape if hasattr(item, 'shape') else len(item) for item in first_batch])
else:
    print(first_batch.shape if hasattr(first_batch, 'shape') else len(first_batch))


<class 'torch.Tensor'>
torch.Size([4, 16063])


In [42]:
trainVAE(model, optimizer, epochs=50, device=device)

	Epoch 1 	Average Loss:  16614.05078125
	Epoch 2 	Average Loss:  16620.873046875
	Epoch 3 	Average Loss:  16615.5419921875
	Epoch 4 	Average Loss:  16613.958984375
	Epoch 5 	Average Loss:  16616.109375
	Epoch 6 	Average Loss:  16616.91796875
	Epoch 7 	Average Loss:  16615.4169921875
	Epoch 8 	Average Loss:  16610.0166015625
	Epoch 9 	Average Loss:  16606.78515625
	Epoch 10 	Average Loss:  16608.5146484375
	Epoch 11 	Average Loss:  16608.4140625
	Epoch 12 	Average Loss:  16610.7138671875
	Epoch 13 	Average Loss:  16613.06640625
	Epoch 14 	Average Loss:  16609.8251953125
	Epoch 15 	Average Loss:  16607.193359375
	Epoch 16 	Average Loss:  16606.7578125
	Epoch 17 	Average Loss:  16608.5908203125
	Epoch 18 	Average Loss:  16606.73046875
	Epoch 19 	Average Loss:  16608.6904296875
	Epoch 20 	Average Loss:  16605.1796875
	Epoch 21 	Average Loss:  16607.5048828125
	Epoch 22 	Average Loss:  16604.99609375
	Epoch 23 	Average Loss:  16606.3203125
	Epoch 24 	Average Loss:  16606.5341796875
	Epoch 2

66408.28125

In [None]:
def generate_synthetic_data(model, num_samples, device):
    model.eval()
    with torch.no_grad():
        # Sample from a standard normal distribution with shape [num_samples, 2]
        z = torch.randn(num_samples, 2).to(device)  # Adjusted to match the decoder's input dimension
        # Generate synthetic data
        synthetic_data = model.decode(z)
    return synthetic_data.cpu()

# Example usage
num_samples = 10  # Number of synthetic data points you want to generate
synthetic_data = generate_synthetic_data(model, num_samples, device)



In [None]:
synthetic_data.size()

torch.Size([10, 16063])

In [None]:
synthetic_data[0]

tensor([0.4541, 0.4503, 0.4479,  ..., 0.4558, 0.4298, 0.4513])

In [None]:
data = synthetic_data.numpy()

In [None]:
num_rows = data.shape[0]
labels = np.full((num_rows, 1), b'Lung', dtype=object)

# Combine the original array with the label column
combined_data = np.column_stack((data, labels))

combined_data[0]



array([0.45411989092826843, 0.4502677917480469, 0.4478558897972107, ...,
       0.4297598898410797, 0.4513128995895386, b'Lung'], dtype=object)

In [None]:
np.save('syn_lung.npy', combined_data)
# lung = np.load('syn_lung.npy')

In [None]:
# torch.save(model, 'vae_lung_model.pth') # save all model and parameters
torch.save(model.state_dict(), 'vae_lung_model_state_dict.pth')


In [None]:
# Load model 
model = VAE(input_dim=16063, hidden_dim=400, latent_dim=200, device=device)
model.load_state_dict(torch.load('vae_model_state_dict.pth'))
model.to(device)
