<h1><center>Sam Armstrong CS548 Project</center></h1>
<h1><center>Variational Autoencoder with Classifier</center></h1>

## Import Libraries

In [1]:
import torch
from torch.autograd import Variable
import numpy as np
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
from torch.utils.data import Dataset, DataLoader

## Load RNA-Seq Data

In [2]:
seq_data = pd.read_csv("tumor_data/data.csv")
seq_data.rename(columns={'Unnamed: 0':'sample'}, inplace=True)

## Load Tumor Type/Class Data

In [3]:
labels = pd.read_csv("tumor_data/labels.csv")
labels.rename(columns={'Unnamed: 0':'sample'}, inplace=True)

## Join RNA-Seq and Tumor Type/Class Data

In [4]:
tumor_data = seq_data.set_index('sample').join(labels.set_index('sample')).to_numpy()

## Custom Pytorch Dataset for Cancer Data

In [5]:
class TumorDataset(Dataset):
    
    def __init__(self, data, transform=None):
        'Initialization'
        self.labels = data[:, -1]
        self.lookup_table, self.labels = np.unique(data[:, -1], return_inverse=True)
        self.genes = data[:, :-1]
        self.transform = transform

    def __len__(self):
        'Denotes the total number of samples'
        return len(self.labels)

    def __getitem__(self, index):
        'Generates one sample of data'
        # Load data and get label
        X = torch.from_numpy(self.genes[index, : ].astype(np.float)).float()
        y = torch.from_numpy(np.unique(self.labels[index]).astype(int))

        return X, y

## Encoder Class

In [6]:
class Encoder(torch.nn.Module):
    def __init__(self, D_in, H, D_out):
        super(Encoder, self).__init__()
        self.linear1 = torch.nn.Linear(D_in, H)
        self.linear2 = torch.nn.Linear(H, D_out)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        return F.relu(self.linear2(x))

## Decoder Class

In [7]:
class Decoder(torch.nn.Module):
    def __init__(self, D_in, H, D_out):
        super(Decoder, self).__init__()
        self.linear1 = torch.nn.Linear(D_in, H)
        self.linear2 = torch.nn.Linear(H, D_out)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        return F.relu(self.linear2(x))

## Classifier Class

In [8]:
class Classifier(torch.nn.Module):
    def __init__(self, D_in, H, D_out):
        super(Classifier, self).__init__()
        
        self.linears = torch.nn.ModuleList([torch.nn.Linear(D_in, H[0])])
        
        for i in range(1,len(H)):
            self.linears.append(torch.nn.Linear(H[i-1], H[i]))
            
        self.linears.append(torch.nn.Linear(H[-1], D_out))
  
    def forward(self, x):
        for h in self.linears:
            x = h(x)
       
        return F.softmax(x, dim=1)

## Variational Autoencoder Class with Classifier

In [9]:
class VAE(torch.nn.Module):

    def __init__(self, encoder, decoder, classifier, latent_dim):
        super(VAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.classifier = classifier
        self._enc_mu = torch.nn.Linear(self.encoder.linear2.out_features, latent_dim)
        self._enc_log_sigma = torch.nn.Linear(self.encoder.linear2.out_features, latent_dim)

    def _sample_latent(self, h_enc):
        """
        Return the latent normal sample z ~ N(mu, sigma^2)
        """
        mu = self._enc_mu(h_enc)
        log_sigma = self._enc_log_sigma(h_enc)
        sigma = torch.exp(log_sigma)
        std_z = torch.from_numpy(np.random.normal(0, 1, size=sigma.size())).float()

        self.z_mean = mu
        self.z_sigma = sigma

        return mu + sigma * Variable(std_z, requires_grad=False)  
    
    
    def forwardEncoder(self, state):
        h_enc = self.encoder(state)
        return self._sample_latent(h_enc)
    
    def forwardAutoEncoder(self, state):
        z = self.forwardEncoder(state)
        return self.decoder(z)
    
    def forwardClassifier(self, state):
        z = self.forwardEncoder(state)
        return self.classifier(z)
    
    def saveModel(self, fileName="vaeModel"):
        torch.save(self.state_dict(), fileName)
        print("Model Saved Successfully")
        
    def loadModel(self, fileName="vaeModel"):
        self.load_state_dict(torch.load(fileName))
        self.eval()

## Custom Loss Method (Kullback–Leibler Loss)

In [10]:
def latent_loss(z_mean, z_stddev):
    mean_sq = z_mean * z_mean
    stddev_sq = z_stddev * z_stddev
    return 0.5 * torch.mean(mean_sq + stddev_sq - torch.log(stddev_sq) - 1)

## Train AutoEncoder (Encoder + Decoder)

In [11]:
def trainAutoencoder(modelName, loadModel=False, saveModel=True, input_dim = 20531, latent_dim = 10, encoding_layer = 100, classifier_layers = [20, 20, 20], n_classes = 5, batch_size = 32, epochs = 10, verbose = True):

    encoder = Encoder(input_dim, encoding_layer, encoding_layer)
    decoder = Decoder(latent_dim, encoding_layer, input_dim)
    classifier = Classifier(latent_dim, classifier_layers, n_classes)

    vae = VAE(encoder, decoder, classifier, latent_dim)
    
    if loadModel:
        vae.loadModel(modelName)

    train_loader = torch.utils.data.DataLoader( TumorDataset(tumor_data), batch_size=batch_size, shuffle=True)

    if verbose:
        print('Number of samples: ', len(tumor_data))

    criterion = torch.nn.MSELoss()
    optimizer = optim.Adam(vae.parameters(), lr=0.0001)

    l = None
    for epoch in range(epochs):
        for i, data in enumerate(train_loader, 0):
            inputs, classes = data
            inputs, classes = Variable(inputs.resize_(batch_size, input_dim)), Variable(classes)
            optimizer.zero_grad()
            dec = vae.forwardAutoEncoder(inputs.float())
            ll = latent_loss(vae.z_mean, vae.z_sigma)
            loss = criterion(dec, inputs) + ll
            loss.backward()
            optimizer.step()
            l = loss.item()
        if verbose:
            print(epoch, l)

    if saveModel:
        vae.saveModel(modelName + "_loss-" + str(l) + "_CIvec-" + str(latent_dim) + "_class-" + str(classifier_layers).replace("[", "").replace("]", "").replace(", ", "."))
        
    return vae

In [12]:
trainAutoencoder("testModel", saveModel=False, epochs=2)

Number of samples:  801
0 1.7864315509796143
1 1.6781020164489746


VAE(
  (encoder): Encoder(
    (linear1): Linear(in_features=20531, out_features=100, bias=True)
    (linear2): Linear(in_features=100, out_features=100, bias=True)
  )
  (decoder): Decoder(
    (linear1): Linear(in_features=10, out_features=100, bias=True)
    (linear2): Linear(in_features=100, out_features=20531, bias=True)
  )
  (classifier): Classifier(
    (linears): ModuleList(
      (0): Linear(in_features=10, out_features=20, bias=True)
      (1): Linear(in_features=20, out_features=20, bias=True)
      (2): Linear(in_features=20, out_features=20, bias=True)
      (3): Linear(in_features=20, out_features=5, bias=True)
    )
  )
  (_enc_mu): Linear(in_features=100, out_features=10, bias=True)
  (_enc_log_sigma): Linear(in_features=100, out_features=10, bias=True)
)

## Train Classifier (Encoder + Classifier)

In [13]:
def trainClassifier(modelName, loadModel=False, saveModel=True, input_dim = 20531, latent_dim = 10, encoding_layer = 100, classifier_layers = [20, 20, 20], n_classes = 5, batch_size = 32, epochs = 10, verbose = True):
    
    encoder = Encoder(input_dim, encoding_layer, encoding_layer)
    decoder = Decoder(latent_dim, encoding_layer, input_dim)
    classifier = Classifier(latent_dim, classifier_layers, n_classes)

    vae = VAE(encoder, decoder, classifier, latent_dim)
    
    if loadModel:
        vae.loadModel(modelName)

    train_loader = torch.utils.data.DataLoader( TumorDataset(tumor_data), batch_size=batch_size, shuffle=True, drop_last=True)

    if verbose:
        print('Number of samples: ', len(tumor_data))

    criterion = torch.nn.CrossEntropyLoss()

    optimizer = optim.Adam(vae.parameters(), lr=0.0001)

    l = None
    for epoch in range(epochs):
        for i, data in enumerate(train_loader, 0):
            inputs, classes = data
            inputs, classes = Variable(inputs.resize_(batch_size, input_dim)), Variable(classes).flatten().long()
            predictions = vae.forwardClassifier(inputs.float())
            optimizer.zero_grad()
            ll = latent_loss(vae.z_mean, vae.z_sigma)
            loss = criterion(predictions, classes) + ll
            loss.backward()
            optimizer.step()
            l = loss.item()
        if verbose:
            print(epoch, l)
        
    if saveModel:
        vae.saveModel(modelName + "_loss-" + str(l) + "_CIvec-" + str(latent_dim) + "_class-" + str(classifier_layers).replace("[", "").replace("]", "").replace(", ", "."))
        
    return vae

In [14]:
trainClassifier("testModel", saveModel=False, epochs=2)

Number of samples:  801
0 8.114500045776367
1 2.53641414642334


VAE(
  (encoder): Encoder(
    (linear1): Linear(in_features=20531, out_features=100, bias=True)
    (linear2): Linear(in_features=100, out_features=100, bias=True)
  )
  (decoder): Decoder(
    (linear1): Linear(in_features=10, out_features=100, bias=True)
    (linear2): Linear(in_features=100, out_features=20531, bias=True)
  )
  (classifier): Classifier(
    (linears): ModuleList(
      (0): Linear(in_features=10, out_features=20, bias=True)
      (1): Linear(in_features=20, out_features=20, bias=True)
      (2): Linear(in_features=20, out_features=20, bias=True)
      (3): Linear(in_features=20, out_features=5, bias=True)
    )
  )
  (_enc_mu): Linear(in_features=100, out_features=10, bias=True)
  (_enc_log_sigma): Linear(in_features=100, out_features=10, bias=True)
)

## Get Classifier Percentage Correct on Train Data

In [15]:
def printClassifierAccuracy(model, input_dim = 20531, batch_size = 32):
    correct = 0
    total = 0
    train_loader = torch.utils.data.DataLoader( TumorDataset(tumor_data), batch_size=batch_size, shuffle=True, drop_last=True)
    
    for i, data in enumerate(train_loader, 0):
        inputs, classes = data
        inputs, classes = Variable(inputs.resize_(batch_size, input_dim)), Variable(classes).flatten().long()
        predictions = model.forwardClassifier(inputs.float())
        _, predicted = torch.max(predictions.data, 1)
        total += classes.size(0)
        correct += (predicted == classes).sum().item()

    print('Accuracy of the network on the training data: %d %%' % (
        100 * correct / total))

In [19]:
model = trainClassifier("testModel", saveModel=False, epochs=2)
printClassifierAccuracy(model)

Number of samples:  801
0 2.433783531188965
1 1.7174665927886963
Accuracy of the network on the training data: 36 %


In [20]:
latent_dimension = range(8, 33)
classifiers = [[10]*2, [10]*3, [10]*4, [10]*5, 
               [20]*2, [20]*3, [20]*4, [20]*5, 
               [30]*2, [30]*3, [30]*4, [30]*5,
               [40]*2, [40]*3, [40]*4, [40]*5,
               [50]*2, [50]*3, [50]*4, [50]*5]

In [21]:
for i in latent_dimension:
    for c in classifiers:
        trainClassifier("vaeModel1", epochs=300, latent_dim=i, classifier_layers=c, verbose=False)
        print("Model Complete")

Number of samples:  801
0 1.6529542207717896
1 1.6119574308395386
2 1.606869101524353
3 1.5993549823760986
4 1.6041462421417236
5 1.5879263877868652
6 1.600173830986023
7 1.6029385328292847
8 1.5934261083602905
9 1.6006280183792114
10 1.6160714626312256
11 1.6159781217575073
12 1.6146551370620728
13 1.5885255336761475
14 1.5997686386108398
15 1.593442440032959
16 1.5971723794937134
17 1.5994203090667725
18 1.5861436128616333
19 1.5977225303649902
20 1.5915696620941162
21 1.5949057340621948
22 1.5961037874221802
23 1.56308913230896
24 1.5670340061187744
25 1.5961098670959473
26 1.5612330436706543
27 1.594193696975708
28 1.564295768737793
29 1.5980618000030518
30 1.570786714553833
31 1.5490353107452393
32 1.5735511779785156
33 1.5881454944610596
34 1.5916306972503662
35 1.5609742403030396
36 1.549248456954956
37 1.5477168560028076
38 1.5968376398086548
39 1.5935728549957275
40 1.5871813297271729
41 1.5579156875610352
42 1.5814383029937744
43 1.5657665729522705
44 1.5917474031448364
45 1.

KeyboardInterrupt: 