<a href="https://colab.research.google.com/github/Nwanna-Joseph/Variational_Autoencoder/blob/main/SMILE_Data_VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **A demonstration of creating Variational Autoencoders using the SMILE dataset**

Objective: Generating new SMILE data that wasn't used for training

Imports!

In [142]:
import numpy as np
# from utils import softmax 
import torch
import numpy as np
import argparse
import torch.nn as nn
import torchvision
#from sklearn.preprocessing import OneHotEncoder
from torch.utils.data import Dataset, DataLoader

Utils

In [143]:
class SMILEText:
  def __init__(self):
    with open('/content/smiles.txt','r') as f:
      self.text = f.read()
      self.unique_chars = set(self.text)
      self.from_array= self.text.split('\n')

smile_text = SMILEText()

In [297]:
class SMILESEncoder:
    def __init__(self) -> None:
        pass

        # Allowed tokens (adapted from default dictionary)
        self._tokens = np.sort([k for k in smile_text.unique_chars])
        self.image_template = np.zeros((200,len(self._tokens)+1)) #+1 for unknown. 100 * number of unique chars

        self.c2i = {}
        self.i2c = {}

        for i,c in enumerate(self._tokens):
          self.c2i[c] = i
          self.i2c[i] = c

    
    def encode( self, data ):

      assert len(data)<100

      char_image_template = self.image_template[:,:]

      for i,c in enumerate(data):
        char_image_template[i][self.c2i[c]] = 1

      return np.array(char_image_template)



    def decode(self, one_hot):
      text = ""
      for row in one_hot:
        for i,elem in enumerate(row):
          if(elem == 1 and self.i2c.get(i)):
            text += self.i2c.get(i)
      
      return text

In [298]:
def softmax(z):
    """Softmax function """
    # raise NotImplementedError
    return torch.nn.functional.softmax(z,dim = 0)
smiles_encoder = SMILESEncoder()
def idx_to_sequence(idx):
    return smiles_encoder.decode(idx)

def sequence_to_idx(sequence):
    return smiles_encoder.encode(sequence)

In [299]:
#Unit tests
test_sequence = smile_text.from_array[10]
print("SMILE data : ",test_sequence)
to_idx = sequence_to_idx(test_sequence)
print(f"One hot encoding for {to_idx.shape} , {test_sequence} : \n",to_idx)
to_char = idx_to_sequence(to_idx)
print(f"To SMILE data : ", to_char )

SMILE data :  c1ccc(cc1)[C@@H](C(=O)[O-])O
One hot encoding for (200, 35) , c1ccc(cc1)[C@@H](C(=O)[O-])O : 
 [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
To SMILE data :  c1ccc(cc1)[C@@H](C(=O)[O-])O


In [300]:
def temperature_sampling(temperature):

  """
  Temperature sampling wrapper function
  This wrapper function will allow us use the temperature sampling strategy to decode our predicted sequences
  """
  
  def decode(preds):

    """ 
    Decoder using temperature 
    """

    preds = np.asarray(preds).astype("float64")
    preds = np.log(preds) / temperature
    reweighted_preds = softmax(torch.tensor(preds))
    probs = np.random.multinomial(3, reweighted_preds, 3)

    return np.argmax(probs)

  return decode

In [301]:
#Temperature Sampling Unit test
predictions = torch.tensor([0.0021, 0.0158, 0.1171, 0.8650])
predictions_b = torch.tensor([0.0021, 0.8650, 0.0158, 0.1171])
temp_sample_a = temperature_sampling(1)(predictions)
temp_sample_b = temperature_sampling(0.3)(predictions_b) #returns the index of the highest prediction
print(temp_sample_a)
print(temp_sample_b)

3
1


In [125]:
#Did not use this
class View(nn.Module):
    def __init__(self, shape):
        super(View, self).__init__()
        self.shape = shape

    def forward(self, x):
        return x.view(*self.shape)

#Did not use this
class ConvVAE(nn.Module):
    '''Simple VAE network, based on the example VAE found in 
    the pytorch examples.
    '''

    def __init__(self, image_w=200, image_h=200, latent_size=30):
        super(ConvVAE, self).__init__()

        self._image_w = image_w
        self._image_h = image_h
        self._latent_size = latent_size

        # Encoder layer into latent space.
        self._encoder = nn.Sequential(nn.Conv2d(1, 16, 20, stride=20),
                                      nn.Sigmoid(), View([-1, 100]))

        # Latent space to prob space
        self._mean = nn.Sequential(nn.Linear(100, self._latent_size),
                                   nn.Sigmoid())

        self._std = nn.Sequential(nn.Linear(100, self._latent_size),
                                  nn.Sigmoid())

        # Latent space to decoded space.
        self._decoder = nn.Sequential(
            nn.Linear(self._latent_size, 400), nn.ReLU(),
            nn.Linear(400, 1 * self._image_w * self._image_h), nn.Sigmoid())

    def encode(self, x):
        h1 = self._encoder(x)
        return self._mean(h1), self._std(h1)

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std

    def decode(self, z):
        return self._decoder(z)

    def forward(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        return self.decode(z), mean, logvar, z

In [36]:
#Did not use this
class Encoder(nn.Module):
    def __init__(self) -> None:
        super(Encoder,self).__init__()
        self.input_dimension = 1
        self.latent_dim = 20
        self.size_of_batch = 32
        # Define all layers here
        self.layer1 = nn.Sequential(
            nn.Sequential(
                    nn.Conv2d(self.input_dimension, 16,
                              kernel_size= 3, stride= 2, padding  = 1),
                    nn.BatchNorm2d(16),
                    nn.LeakyReLU()),
            nn.Sequential(
                    nn.Conv2d(16, 32,
                              kernel_size= 2, stride= 2, padding  = 1),
                    nn.BatchNorm2d(32),
                    nn.LeakyReLU()),
        )
    
    def forward(self, input_x):
        """
        Define the sequence from input to output here
        """
        jump_1 = self.layer1(input_x)
        flatten = nn.Flatten()(jump_1)
        features_len = flatten.shape[1]
        mu = nn.Linear(features_len, self.latent_dim)(flatten) # implement mu here
        sigma = nn.Linear(features_len, self.latent_dim)(flatten) # implememt sigma here
        return mu, sigma

#Did not use this
class Decoder(nn.Module):
    def __init__(self) -> None:
        super(Decoder,self).__init__()
        self.latent_dim = 20

        # Initialize all layers here

    def forward(self, input):
        # flatten = nn.Flatten()(input)
        latent = nn.Linear(38,20)(input.permute(1,0))
        #Implement the decoder forward pass here
        output = latent
        return output
#Did not use this
class Reparameterization(nn.Module):
    def __init__(self) -> None:
        super().__init__()


    def forward(self, mu, sigma):
        output = ...
        return output

#Did not use this
class VAE(nn.Module):

    # def reparameterize(self, mu, logvar):
    #     std = logvar.mul(0.5).exp_()
    #     esp = to_var(torch.randn(*mu.size()))
    #     z = mu + std * esp
    #     return z

    def __init__(self) -> None:
        super().__init__()
        self.encoder = Encoder()
        # self.encoder = nn.Sequential(
        #     nn.Linear(image_size, h_dim),
        #     nn.LeakyReLU(0.2),
        #     nn.Linear(h_dim, z_dim*2)
        # )
        # # self.decoder = ...
        # self.decoder = nn.Sequential(
        #     nn.Linear(z_dim, h_dim),
        #     nn.ReLU(),
        #     nn.Linear(h_dim, image_size),
        #     nn.Sigmoid()
        # )
        # self.reparameterization =...

    def forward(self, input):
      return encoder(input)
      pass
        # output = ...
        # return output 
        # h = self.encoder(x)
        # mu, logvar = torch.chunk(h, 2, dim=1)
        # z = self.reparameterize(mu, logvar)
        # return self.decoder(z), mu, logvar

In [40]:
#Unit tests
device = "cpu"
test_tensor = torch.tensor([a for a in sequence_to_idx( smile_text.from_array[0] ) ], dtype= torch.float32) .reshape(38,1,1,34)
test_tensor.to(device)
encoder = Encoder()
encoder.to(device)
encoded_data = encoder(test_tensor)

In [302]:
from torch.autograd import Variable
class VAE(nn.Module):

    def __init__(self, image_size=35 , h_dim=400, z_dim=20):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(image_size, h_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(h_dim, z_dim*2)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(z_dim, h_dim),
            nn.ReLU(),
            nn.Linear(h_dim, image_size),
            nn.Sigmoid()
        )

    def reparameterize(self, mu, logvar):
        std = logvar.mul(0.5).exp_()
        esp = to_var(torch.randn(*mu.size()))
        z = mu + std * esp
        return z
    
    def forward(self, x):
        h = self.encoder(x)
        mu, logvar = torch.chunk(h, 2, dim=1)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar

In [303]:
class SmileDataset(torch.utils.data.Dataset):
  #Adapter pattern (Software design pattern)
  def __init__(self, file_path):
    with open("/content/smiles.txt",'r') as f:
      text = f.read()

    #get unique characters in the data
    unique_chars = set(text) 

    #create a map of SMILE character to index
    hashmap_char_index = {}
    #create a map of SMILE index to character
    hashmap_index_char = {}

    for idx,ch in enumerate(unique_chars):
      hashmap_char_index[ch] = idx
      hashmap_index_char[idx] = ch
    
    #text is a concatenation of all the chemical structure. split them via the next line aka \n

    self.data = text.split('\n')
    #self.data[0] is now C[C@@]1(C(=O)C=C(O1)C(=O)[O-])c2ccccc2

    pass
  
  def __len__(self):
    return len(self.data)
  
  def __getitem__(self,idx):
    return self.data[idx]

In [304]:
trainloader = torch.utils.data.DataLoader(SmileDataset("/content/smiles.txt"), batch_size = 32)
model = VAE()
# optimizer = torch.optim.SGD(model.parameters(),lr = 0.0001)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.MSELoss()

In [305]:
def loss_fn(recon_x, x, mu, logvar):
    BCE = torch.nn.functional.binary_cross_entropy(recon_x, x, size_average=False)

    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    KLD = -0.5 * torch.sum(1 + logvar - mu**2 -  logvar.exp())
    return BCE + KLD, BCE, KLD

In [306]:
def flatten(x):
    return to_var(x.view(x.size(0), -1))
    
def save_image(x, path='real_image.png'):
    torchvision.utils.save_image(x, path)

def to_var(x):
    if torch.cuda.is_available():
        x = x.cuda()
    return Variable(x)

In [307]:
test = iter(trainloader).next()[0]
test = flatten(torch.tensor( sequence_to_idx(test), dtype= torch.float32 ))

mu_learnt = None
std_learnt = None
loss_array = []
bce_loss_array = []
kld_loss_array = []
for epoch in range(2):
    for idx, image in enumerate(trainloader):
        # images = [print(sequence_to_idx( a ).shape) for a in image ]
        images = flatten(torch.tensor([sequence_to_idx( a ) for a in image ], dtype= torch.float32).reshape(-1,1,1,35))

        recon_images, mu, logvar = model(images)
        mu_learnt = mu
        std = logvar.mul(0.5).exp_()
        std_learnt = std
        
        loss, bce_loss, kld_loss = loss_fn(recon_images, images, mu, logvar)
        loss_array.append(loss)
        bce_loss_array.append(bce_loss)
        kld_loss_array.append(kld_loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if idx%100 == 0:
            print("Epoch[{}/{}] Loss: {:.3f}".format(epoch+1, 2, loss.item()/32))
    
            # recon_x, _, _ = model(test)
            # save_image(recon_x.view(1, 1, 200, 35).data.cpu(), f'/content/reconstructed/recon_image_{epoch}_{idx}.png')

torch.Size([200, 35])
Epoch[1/2] Loss: 4849.317




Epoch[1/2] Loss: 855.314
Epoch[1/2] Loss: 809.248
Epoch[1/2] Loss: 700.765
Epoch[1/2] Loss: 659.113
Epoch[1/2] Loss: 635.517
Epoch[1/2] Loss: 630.030
Epoch[1/2] Loss: 620.514
Epoch[1/2] Loss: 612.607
Epoch[1/2] Loss: 618.477
Epoch[1/2] Loss: 645.797
Epoch[1/2] Loss: 584.799
Epoch[1/2] Loss: 580.062
Epoch[1/2] Loss: 581.203


KeyboardInterrupt: ignored

In [308]:
# from decoder import temperature_sampling
# from utils import idx_to_sequence
# parser = argparse.ArgumentParser()

# parser.add_argument('epochs', type=int, help='number of full forward passes through data')

# parser.add_argument('') # You may add more arguments here! .....

# args = parser.parse_args()


#mini overrides
epochs = 1

def train(trainloader, 
          model,
          epochs,
          optimizer,
          criterion)->None:
    model.train() 
    for epoch in range(epochs):
        for data in trainloader:
            test_tensor = torch.tensor([a for a in sequence_to_idx( data ) ], dtype= torch.float32) .reshape(38,1,1,34)
            print(test_tensor.shape)
            prediction = model(data)
            # loss = criterion(prediction, label)
            # optimizer.zero_grad()
            # loss.backward()
            # optimizer.step()

def generate(model, mu, std):
    model.eval()
    z = np.random.normal(mu.detach().numpy(),std.detach().numpy()) #from s = np.random.normal(mu, sigma, 1000), generating some random texts from the distribution
    out_probs = model.decoder(torch.tensor(z,dtype=torch.float32)) #z is expected to be numpy. Here, it's being converted to "Tensors" and fed to the decoding model
    
    output = ""
    for row in out_probs.detach().numpy():
      dx = temperature_sampling(1)(row)
      ch = smiles_encoder.i2c.get(dx)
      if(ch):
        output += ch

    # print(output)

    generation = output#idx_to_sequence([[0,] for  temperature_sampling(1)(out_probs.detach().numpy())]) #output probs is a tensor thet's converted to numpy for use in numpy. It's being passed to temperature_sampling which needs to call .decode()
    return generation


In [309]:
generate( model, mu_learnt, std_learnt )

  from ipykernel import kernelapp as app


'C#C#)(/F-(#)#(S/[H+15[)(2#+)-/(F##C-=)r#()@+3BC#3N4-+#)(CC))C3)3)O)OC33)3O33CC33OOC)3333O)OCC3C3)3)O3)3O33)3)33)CC333C3OC3)OC)333O)33C3C33333O))3333O3))3CCO333)C1=@(PF4/+[4#-(#1##/#1(I))5#1/++/+34+c+3#3#1))(16))l#N=+])]3]]O)))C)OO3))3)3))3O)33OC))3O33)))3)333)33F)3))33))O333CCO33O33C333)3)3O33CO33)O)3))33)33O33CO33C33CC@=H(34@-CH#)@\\)#2@F--)4H(+(5Ho(-5-(1-/C#5]@/(21+)(#)#-COO=]C)C)C)333O))C3OC3)O)))33)C)))33))3333)O33333C3C33)O3333C33)3O)33)O3)OO3333O3C33)O3O3))33333O333)33C333)[C/+1n/4-42P/(F#]l@5B-[5-()\\6(s)6/(\\-N4O)#2#)s45[Ol((=)4=])3l)C33)O3OCOC)3COO))3OO33333)333333)C))3)O)O)3O)C333)3O)C33)C33)3)3)33)C)3O)CO33C)3)3)333OC)333)C3O)O)3)OO3)CcO+H+C()#(-H]-(-r1P(B4#@/I/B])@42@(3@-#)B-+/21H]4##+#5OOO--O)[)C3)O3)O)3C3333C3333))3)O)O3)33C3)333))3C3O)3OC)O)3333)))))33))3C3)O333)33)333)3)COO3)33333C3333C33CCOC(-+141)//OFC+)rB()+2/(#/)H)HB)(=5/5(#n(/C/+4#S=C3445)c=34C--)O)O)3C33)3O333)3O3)333333O33CO)33333)O)OC33)C3))33333C333C3C3)O33)O))33)C3)333)33333C)33)C333C3O33)3O))CC+=-##4B))()-F=

# Task 1 : 
Do a character-level representation for the SMILES molecules.


In [310]:
test_sequence = smile_text.from_array[10]
print("SMILE data : ",test_sequence)
to_idx = sequence_to_idx(test_sequence)
print(f"One hot encoding for {to_idx.shape} , {test_sequence} : \n",to_idx)

SMILE data :  c1ccc(cc1)[C@@H](C(=O)[O-])O
One hot encoding for (200, 35) , c1ccc(cc1)[C@@H](C(=O)[O-])O : 
 [[0. 0. 0. ... 0. 0. 0.]
 [0. 1. 1. ... 0. 0. 0.]
 [0. 1. 1. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


# Task 2 
Train a variational autoencoder that jointly learns to encode SMILES molecules to a latent distribution and reconstructs from the latent distribution.

In [None]:
test = iter(trainloader).next()[0]
test = flatten(torch.tensor( sequence_to_idx(test), dtype= torch.float32 ))

mu_learnt = None
std_learnt = None
loss_array = []
bce_loss_array = []
kld_loss_array = []
for epoch in range(1):
    for idx, image in enumerate(trainloader):
        # images = [print(sequence_to_idx( a ).shape) for a in image ]
        images = flatten(torch.tensor([sequence_to_idx( a ) for a in image ], dtype= torch.float32).reshape(-1,1,1,35))

        recon_images, mu, logvar = model(images)
        mu_learnt = mu
        std = logvar.mul(0.5).exp_()
        std_learnt = std
        
        loss, bce_loss, kld_loss = loss_fn(recon_images, images, mu, logvar)
        loss_array.append(loss)
        bce_loss_array.append(bce_loss)
        kld_loss_array.append(kld_loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if idx%100 == 0:
            print("Epoch[{}/{}] Loss: {:.3f}".format(epoch+1, 2, loss.item()/32))
    
            # recon_x, _, _ = model(test)
            # save_image(recon_x.view(1, 1, 200, 35).data.cpu(), f'/content/reconstructed/recon_image_{epoch}_{idx}.png')

# Task 3 
Sample a latent vector from the standard normal distribution and use the trained VAE models’ decoder to reconstruct the SMILES molecules from the latent vectors

In [313]:
generate( model, mu_learnt, std_learnt )

  from ipykernel import kernelapp as app


'c12[C-+#3//-\\-HC+3)43/H-B#+#(-++(()315c))1O#N=35)2-c(=]3-C))-O3)4C333O3C)333)3C3))CO3)3O)3C)O)3))))333)O3O)O)O333O)O33333)333)33)3OO)O3CO333))3)C33)33O))O33C3C(Nn+[\\/#+122B=2#4-N+=513)))\\1-4-1+#@B#/)2@((C))#[)-(5#)=C--O(3)O)O3333CCO)33)C))33O))3O3)3)333)3333O3))3)3C3333O3C3333))333)3)33)3))C3CC3)3))))3)333333CO)O3)CC)O3))CHO2)-1C+2-4-(4+51r/)(1B)2BHFF22I)3((I32+1))/(4#=4C@+C()(#)FOFCC3O)OO))3))333O)3)O-3OO3O)3333OO3)O)333)33333333))3COC3OCO3O3)3)O3))33333)3COO))O)))3OC3333C))OO)33O33)3[[N=+Ps(1)/2/1#(1=/+O/-I#5O)/+-//3O2O+5C/#C/)1C[2)55)(OO)CO3C)3)3O33)3333))3))333O33333)O)O)C)O333O3O3COO3C33O33333)33)3O33O)3O)33O3)33CO33)C33C3C333333))3OOCC)c1H#114=B/+@#45=/FC3(#/+4=F##/=5(O#)()2(=--C2((1-)#4+F#[4CC3))O)33)33O33)3OO33CO333))3)3O)33)OC)33)3333)33C))33)3OO3)33)33OO33)))C3)))O)333O3OOOO))3333)C333/(n@B3#31)+(]###O)O=4#B/2]1-CF+#3-O)=//#5342[#)2#)))OO333C)C33O333C3C)33)3)))33C3C)333))))3333C)OO)3)3)OC)O3)C)O)O)O)33)O)))33C))3)3C33))OC)3C)33)3)33)))3OC11-)(##1(2--#S)#+B-+#3H)c454#-23-=

# Task 4
Implement a simple decoder to decode the generated samples and parse them to character representation.

In [312]:
to_char = idx_to_sequence(to_idx)
print(f"To SMILE data : ", to_char )

To SMILE data :  C[c#(/1=CHNOS[cn#(/1=CNOPS[\]cn(+/12=@CHNOPS[\cnos()+-/12=@CHNOPS[\]cnos#()+-/12=@CHNOPS[\]cnos#()+-/123=@CFHNOPS[\]cnos#()+-/123=@CHNOPS[\]cnos#()+-/1234=@CHNOPS[\]clnos#()+-/1234=@BCFHNOS[\]clnos#()+-/1234=@BCFHINOPS[\]clnors#()+-/1234=@BCFHINOPS[\]clnors#()+-/1234=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/1234=@BCFHINOPS[\]clnors#()+-/1234=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/1234=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/123456=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/123456=@BCFHINOPS[\]clnors#()+-/12345=@BCFHINOPS[\]clnors#()+-/123456=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/12345=@BCFHINOS[\]clnors#()+-/123456=@BCFHNOS[\]clnors#()+-/123456=@BCFHINOS[\