# Imports

In [1]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import pandas as pd
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.data.dataset import random_split
import torchvision.transforms as transforms
import time

# Data Collecting & vizualisation

In [2]:
# Define a custom dataset class for PyTorch
class SupplierDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        sample = self.dataframe.iloc[idx]

        # Extract features
        labels = ['id', 'status']
        features = sample[labels].values # ignore name and address for now

        # Apply transformations (e.g., convert strings/categories to numerical values)
        if self.transform:
            features = self.transform(features, labels).astype(np.float32)
        # Convert to PyTorch tensor
        features = torch.tensor(features, dtype=torch.float32)

        return features, 0 # 0 is a dummy label

In [3]:
def transform_categorical(data, labels):
    # Convert categorical variables to numerical values (you can use more advanced encoding methods)
    status_mapping = {'draft': 0, 'val': 1, 'other': 2}
    status_index = labels.index('status')
    data[status_index] = status_mapping[data[status_index]]

    # You can implement similar transformations for other categorical variables

    return data

In [4]:
# Load the csv dataset from the csv file
dataset_path = 'datasets/fake_supplier_3.csv'
# Creating a DataFrame from the CSV data (replace this with your actual CSV file path)
df = pd.read_csv(dataset_path)
# Create an instance of the SupplierDataset with the specified transformations
supplier_dataset = SupplierDataset(dataframe=df, transform=transform_categorical)

for i in range(len(supplier_dataset)):
    sample = supplier_dataset[i]
    print(f"supplier Sample {i + 1}:", sample)

supplier Sample 1: (tensor([1., 0.]), 0)
supplier Sample 2: (tensor([2., 1.]), 0)


# Split the dataset into train and test sets

In [5]:
train_percentage = 0.2
batch_size = 1

# Split the dataset into training and validation sets
train_size = int(0.8 * len(supplier_dataset))
val_size = len(supplier_dataset) - train_size
train_dataset, val_dataset = random_split(supplier_dataset, [train_size, val_size])

# Create DataLoader instances for training and validation
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Classes and functions needed

In [6]:
class Encoder(nn.Module):
    
    def __init__(self, input_dim=784, hidden_dim=512, latent_dim=256):
        super(Encoder, self).__init__()

        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.mean = nn.Linear(hidden_dim, latent_dim)
        self.var = nn.Linear (hidden_dim, latent_dim)
        self.LeakyReLU = nn.LeakyReLU(0.2)
        self.training = True
        
    def forward(self, x):
        x = self.LeakyReLU(self.linear1(x))
        x = self.LeakyReLU(self.linear2(x))

        mean = self.mean(x)
        log_var = self.var(x)                     
        return mean, log_var

In [7]:
class Decoder(nn.Module):
    
    def __init__(self, output_dim=784, hidden_dim=512, latent_dim=256):
        super(Decoder, self).__init__()

        self.linear2 = nn.Linear(latent_dim, hidden_dim)
        self.linear1 = nn.Linear(hidden_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, output_dim)
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
    def forward(self, x):
        x = self.LeakyReLU(self.linear2(x))
        x = self.LeakyReLU(self.linear1(x))
        
        x_hat = torch.sigmoid(self.output(x))
        return x_hat

In [8]:
class VAE(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super(VAE, self).__init__()

        # Encoder layers
        self.fc1 = nn.Linear(input_dim, 512)
        self.fc2_mean = nn.Linear(512, latent_dim)
        self.fc2_logvar = nn.Linear(512, latent_dim)

        # Decoder layers
        self.fc3 = nn.Linear(latent_dim, 512)
        self.fc4 = nn.Linear(512, input_dim)

    def encode(self, x):
        # Encoder forward pass
        x = F.relu(self.fc1(x))
        mean = self.fc2_mean(x)
        logvar = self.fc2_logvar(x)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        # Reparameterization trick for sampling from a normal distribution
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std

    def decode(self, z):
        # Decoder forward pass
        z = F.relu(self.fc3(z))
        recon_x = torch.sigmoid(self.fc4(z))  # Assuming input features are normalized between 0 and 1
        return recon_x

    def forward(self, x):
        # Full forward pass of the VAE
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        recon_x = self.decode(z)
        return recon_x, mean, logvar

In [9]:
# # Define the VAE architecture
# class VAE(nn.Module):
#     def __init__(self, input_dim, hidden_dim = 64, latent_dim = 16):
#         super(VAE, self).__init__()
#         self.input_dim = input_dim
#         self.hidden_dim = hidden_dim
#         # self.latent_dim = latent_dim

#         # Encoder
#         self.encoder = nn.Sequential(
#             nn.Linear(input_dim, hidden_dim),
#             nn.ReLU(),
#             nn.Linear(hidden_dim, latent_dim * 2)  # The last layer outputs mean and log-variance
#         )

#         # Decoder
#         self.decoder = nn.Sequential(
#             nn.Linear(latent_dim, hidden_dim),
#             nn.ReLU(),
#             nn.Linear(hidden_dim, input_dim),
#             nn.Sigmoid()  # Sigmoid activation for reconstruction
#         )

#     def reparameterize(self, mu, log_var):
#         std = torch.exp(0.5 * log_var)
#         eps = torch.randn_like(std)
#         return mu + eps * std

#     def forward(self, x):
#         # Encode
#         encoded = self.encoder(x)
#         mu, log_var = torch.chunk(encoded, 2, dim=-1)
#         z = self.reparameterize(mu, log_var)

#         # Decode
#         reconstructed = self.decoder(z)

#         return reconstructed, mu, log_var

In [10]:
def loss_function(recon_x, x, mu, log_var):
    BCE = nn.functional.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return BCE + KLD

In [16]:
def train(model, optimizer, train_dataset, epochs, device, x_dim=-1):
    model.train()
    startTotal = time.time()
    for epoch in range(epochs):
        overall_loss = 0
        start = time.time()
        for batch_idx, (x, _) in enumerate(train_dataset):
            print("\t\tBatch", batch_idx + 1, "/", len(train_dataset))
            x = x.view(batch_size, x_dim).to(device)

            optimizer.zero_grad()

            x_hat, mean, log_var = model(x)
            x_hat = torch.sigmoid(x_hat)
            loss = loss_function(x, x_hat, mean, log_var)
            
            overall_loss += loss.item()
            
            loss.backward()
            optimizer.step()
        end = time.time()
        print("\tEpoch", epoch + 1, "\tAverage Loss: ", overall_loss/(batch_idx*batch_size), "\tStep Time: ", end - start, "s", "\tTotal Time: ", end - startTotal, "s")
    return overall_loss

In [12]:
# def train_vae(model, data_loader, num_epochs=100, learning_rate=1e-3):
#     criterion = nn.MSELoss()
#     optimizer = Adam(model.parameters(), lr=learning_rate)

#     for epoch in range(num_epochs):
#         for data in data_loader:
#             optimizer.zero_grad()
#             recon_batch, mu, log_var = model(data)
#             loss = loss_function(recon_batch, data, mu, log_var)
#             loss.backward()
#             optimizer.step()

#         print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

# Create model and train it

In [13]:
# create model and optimizer
input_dim = 1 # corresponds to the number of features in the dataset
latent_dim = 1 # corresponds to the number of latent variables
model = VAE(input_dim, latent_dim).to(device)
optimizer = Adam(model.parameters(), lr=1e-3)

In [14]:
# vae_model = VAE(input_dim)
# train_vae(vae_model, train_loader)

In [17]:
# train model
train(model, optimizer, train_loader, epochs=5, device=device)

		Batch 1 / 1


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x2 and 1x512)

In [None]:
# Generate a new sample
with torch.no_grad():
    sample = torch.randn(1, model.latent_dim)
    generated_supplier = model.decoder(sample).numpy()

# Print or use the generated supplier data as needed
print("Generated Supplier Data:")
print(generated_supplier)

# Plot Result Latent Space

In [None]:
def plot_latent_space(model, scale=5.0, n=25, digit_size=28, figsize=15):
    # display a n*n 2D manifold of digits
    figure = np.zeros((digit_size * n, digit_size * n))

    # construct a grid 
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = torch.tensor([[xi, yi]], dtype=torch.float).to(device)
            x_decoded = model.decode(z_sample)
            digit = x_decoded[0].detach().cpu().reshape(digit_size, digit_size)
            figure[i * digit_size : (i + 1) * digit_size, j * digit_size : (j + 1) * digit_size,] = digit

    plt.figure(figsize=(figsize, figsize))
    plt.title('VAE Latent Space Visualization')
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("mean, z [0]")
    plt.ylabel("var, z [1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()

In [None]:
plot_latent_space(model, scale=5.0)