# Conditional Variational Autoencoder

## Overview

This notebook implements the CVAE architecture we used for our generative model simulation. We used PyTorch for the model and the basic Tensor operations.

In [2]:
! pip install torch

Collecting torch
  Obtaining dependency information for torch from https://files.pythonhosted.org/packages/25/5d/22b4d5d2183e03f197a5e6b2721bb8f7a3e92b4947b16b70316c1e3771f3/torch-2.1.1-cp310-none-macosx_11_0_arm64.whl.metadata
  Downloading torch-2.1.1-cp310-none-macosx_11_0_arm64.whl.metadata (25 kB)
Collecting filelock (from torch)
  Obtaining dependency information for filelock from https://files.pythonhosted.org/packages/81/54/84d42a0bee35edba99dee7b59a8d4970eccdd44b99fe728ed912106fc781/filelock-3.13.1-py3-none-any.whl.metadata
  Downloading filelock-3.13.1-py3-none-any.whl.metadata (2.8 kB)
Collecting sympy (from torch)
  Downloading sympy-1.12-py3-none-any.whl (5.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.7/5.7 MB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25hCollecting networkx (from torch)
  Obtaining dependency information for networkx from https://files.pythonhosted.org/packages/d5/f0/8fbc882ca80cf077f1b246c0e3c3465f7f415439

## CVAE Implementation

The class `CVAE3` is our implementation of the conditional variational autoencoder. It takes as input a set of data and a set of conditions and returns a reconstructed set of data in the same space as the original input data. As in the vanilla VAE architecture, the encoder and decoder allow mappings into and from the latent space, respectively.

In [108]:
# CVAE Model

import torch
from torch import nn, optim
import torch.nn.functional as F

class CVAE3(nn.Module):
    '''
    The CVAE3 class, which implements our model.
    '''
    def __init__(self, cond_dim=100, input_dim=1000, hidden_dim=100, latent_space_dim=2):
        '''
        Constructs a CVAE3 object.
    
        Parameters
        ----------------
        cond_dim : int
            The dimension of the conditions space (flattened).
        input_dim : int
            The dimension of the input or data space (flattened).
        hidden_dim : int
            The dimension of the hidden layers in the encoder and decoder layers
        latent_space_dim : int
            The dimension of the latent space (without accounting for the added conditions). Default value is 2 for a variational autoencoder.
        '''
        super(CVAE3, self).__init__()
        
        # initialize parameters
        self.cond_dim = cond_dim
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_space_dim = latent_space_dim
        
        # Encoder layers
        self.e1  = nn.Linear(input_dim + cond_dim, hidden_dim)
        self.e2 = nn.Linear(hidden_dim, latent_space_dim) # mu
        self.e3 = nn.Linear(hidden_dim, latent_space_dim) # sigma

        # Decoder layers
        self.d1 = nn.Linear(latent_space_dim + cond_dim, hidden_dim)
        self.d2 = nn.Linear(hidden_dim, input_dim)

        # Acivation function
        self.activation = nn.Sigmoid()
        
    def encode(self, x, c):
        '''
        Encodes the data and conditions, returning the mean and std deviation for the distribution.
    
        Parameters
        ----------------
        x : PyTorch Tensor
            The data input to the model.
        c : PyTorch Tensor
            The conditions input to the model.

        Returns
        ----------------
        mu, sigma : int    
        '''
        inputs = torch.cat([x, c])
        h1 = self.activation(self.e1(inputs))
        mu = self.e2(h1)
        sigma = self.e3(h1)
        return mu, sigma

    def reparameterize(self, mu, logvar):
        '''
        Implements the reparametrization trick commonly used in VAE implementations.
    
        Parameters
        ----------------
        mu : int
            The mean of the distribution
        logvar : int
            The log of the variance.

        Returns
        ----------------
        reparametrized value : int   
        '''
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def decode(self, z, c):
        '''
        Decodes the information contained in the latent space, returning the reconstructed data.
    
        Parameters
        ----------------
        z : PyTorch Tensor
            The latent space tensor.
        c : PyTorch Tensor
            The conditions input to the model.

        Returns
        ----------------
        decoded data : PyTorch Tensor    
        '''
        inputs = torch.cat([z, c])
        h3 = self.activation(self.d1(inputs))
        return self.activation(self.d2(h3))

    def forward(self, x, c):
        '''
        Apply the model to the data.
    
        Parameters
        ----------------
        x : PyTorch Tensor
            The data input to the model.
        c : PyTorch Tensor
            The conditions input to the model.

        Returns
        ----------------
        decoded data, mu, sigma : PyTorch Tensor, int, int    
        '''
        mu, logvar = self.encode(x, c)
        z = self.reparameterize(mu, logvar)
        return self.decode(z, c), mu, logvar

## Loading the Data Into a Data Frame

In [25]:
# load the dataframe from 'input_data_frame.csv'

import pandas as pd

df = pd.read_csv('pmt800_dataFrame_5_files.csv', index_col=[0])

# set NaN in dataframe to zero
df = df.fillna(0)

# print the resulting dataframe to check for mistakes
df

Unnamed: 0,run,evt,channel,hit,startTime,nSamples,count,channelPhotonSum,sample000,sample001,...,sample190,sample191,sample192,sample193,sample194,sample195,sample196,sample197,sample198,sample199
77,0,1,800,0.0,1.515198e+07,94,1.0,2.0,7376,7372,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
192,0,2,800,0.0,2.712405e+08,104,1.0,21.0,7372,7373,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
310,0,3,800,0.0,4.966643e+08,114,1.0,16.0,7371,7373,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
424,0,4,800,0.0,7.697597e+08,96,1.0,6.0,7367,7373,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
638,0,6,800,0.0,1.094312e+09,96,1.0,4.0,7374,7374,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29961,4,89,800,0.0,9.959707e+09,95,1.0,9.0,7368,7367,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
30083,4,91,800,0.0,1.035465e+10,90,1.0,3.0,7373,7372,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
30215,4,92,800,0.0,1.037607e+10,87,1.0,4.0,7377,7376,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389
30306,4,93,800,0.0,1.044898e+10,100,1.0,14.0,7373,7378,...,7389,7389,7389,7389,7389,7389,7389,7389,7389,7389


In [109]:
# Create the Pytorch tensors containing the data in the dataframe

import numpy as np

# The conditions are the channelPhotonSum
df_cond = df.loc[:,'channelPhotonSum']
print(df_cond)

# The sample data are the DER response which we are trying to simulate
df_input = df.iloc[:,8:]
print(df_input)

c_tens = torch.from_numpy(df_cond.to_numpy())
print(c_tens.shape)

# For the input data, include a division by the baseline for the detector, which is a value of 7389.
# This will set the input data to a range 0 to 1 which can then be rescaled to 0 to 7389.
x_tens = torch.from_numpy(np.divide(df_input.to_numpy(),7389))
x_tens

77        2.0
192      21.0
310      16.0
424       6.0
638       4.0
         ... 
29961     9.0
30083     3.0
30215     4.0
30306    14.0
30424     6.0
Name: channelPhotonSum, Length: 242, dtype: float64
       sample000  sample001  sample002  sample003  sample004  sample005  \
77          7376       7372       7372       7376       7370       7374   
192         7372       7373       7375       7371       7378       7371   
310         7371       7373       7374       7375       7380       7367   
424         7367       7373       7374       7369       7372       7373   
638         7374       7374       7375       7366       7367       7366   
...          ...        ...        ...        ...        ...        ...   
29961       7368       7367       7371       7369       7372       7371   
30083       7373       7372       7377       7374       7370       7370   
30215       7377       7376       7376       7371       7365       7376   
30306       7373       7378       7368      

tensor([[0.9982, 0.9977, 0.9977,  ..., 1.0000, 1.0000, 1.0000],
        [0.9977, 0.9978, 0.9981,  ..., 1.0000, 1.0000, 1.0000],
        [0.9976, 0.9978, 0.9980,  ..., 1.0000, 1.0000, 1.0000],
        ...,
        [0.9984, 0.9982, 0.9982,  ..., 1.0000, 1.0000, 1.0000],
        [0.9978, 0.9985, 0.9972,  ..., 1.0000, 1.0000, 1.0000],
        [0.9974, 0.9976, 0.9969,  ..., 1.0000, 1.0000, 1.0000]],
       dtype=torch.float64)

## Creating and Training the Model

In [110]:
# create the model (rows in the dataframe are individual events)

model = CVAE3(cond_dim=1, input_dim=x_tens.shape[0],hidden_dim=100, latent_space_dim=2)
print(model)

CVAE3(
  (e1): Linear(in_features=243, out_features=100, bias=True)
  (e2): Linear(in_features=100, out_features=2, bias=True)
  (e3): Linear(in_features=100, out_features=2, bias=True)
  (d1): Linear(in_features=3, out_features=100, bias=True)
  (d2): Linear(in_features=100, out_features=242, bias=True)
  (activation): Sigmoid()
)


In [118]:
# Define the device for PyTorch for future use (if needed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [114]:
# loss function
def loss_function(recon_x, x, mu, logvar):
    '''
    The loss function for the training.

    Parameters
    ----------------
    recon_x : PyTorch Tensor
        The reconstructed data from the model.
    x : PyTorch Tensor
        The input data to the model.
    mu : int
        The mean from the VAE.
    logvar : int
        The logarithm of the variance from the VAE.

    Returns
    ----------------
    loss : int
        The loss for the VAE, which is a sum of the Kulback-Leibler Divergence loss (common to any VAE),
        and the mean squared error loss between the reconstructed data and the original input data.
    '''
    Recon_loss = F.mse_loss(recon_x, x)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    print("Recon_loss = " + str(Recon_loss) + " and KLD_loss = " + str(KLD))
    return Recon_loss + KLD

In [115]:
# test training
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
def train(epochs, x_data, c_data, optimizer):
    model.train()    
    model.float()
    
    loss_file = open('loss_file_', 'w')
    for e in range(epochs):
        for i in range(x_data.shape[1]):
            x = x_data[:,i].float()
            c = torch.reshape(c_data[i],(1,)).float()
            
            x_prediction, mu, logvar = model(x, c)
            
            loss = loss_function(x_prediction, x, mu, logvar)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            loss_file.write("%s\n" % loss.item())
            
    loss_file.close()
    # save the model
    torch.save(model.state_dict(), 'model_weights.pth')

In [117]:
train(50, x_tens, c_tens, optimizer)

Recon_loss = tensor(0.0024, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(1.6004e-05, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0015, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(6.8247e-06, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0015, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(1.5080e-05, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0015, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(5.0664e-07, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0017, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(3.4869e-06, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0014, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(3.2187e-06, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0016, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(3.7849e-06, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0015, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(1.1027e-06, grad_fn=<MulBackward0>)
Recon_loss = tensor(0.0013, grad_fn=<MseLossBackward0>) and KLD_loss = tensor(5.9605e-06