In [11]:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('Logs.log')
console_handler = logging.StreamHandler()
file_handler.setLevel(logging.DEBUG)
console_handler.setLevel(logging.ERROR)

formatter = logging.Formatter('%(asctime)s - %(message)s - Line: %(lineno)d', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

logger.addHandler(file_handler)
logger.addHandler(console_handler)



# Transformer Network

In [97]:
# MULTIHEAD ATTENTION 
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        try:
            assert d_model % num_heads == 0
        except Exception as e:
            logger.error("dimension of the embedding model is not divisable by number of heads")
        
        self.d_models = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads

        # The query, key, value learnable matrices
        self.Wq = nn.Linear(d_model, d_model)
        self.Wk = nn.Linear(d_model, d_model)
        self.Wv = nn.Linear(d_model, d_model)

        self.FCLayer = nn.Linear(d_model, d_model)
    def split_embedding_perHead(self,x):
        # x shape is (batch_size, seq_len, d_model)
        (batch_size, seq_len, d_model) = x.shape
        # logger.info(f"multi-head; x-shape: {x.shape}")
        # let's reshape to (batch_size, seq_len, num_heads, depth)
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        logger.info(f"Multi-head; x reshaped: {x.shape} ")
        # changing the dimensions order to:(batch_size, num_heads, seq_len, depth)
        x = x.permute(0,2,1,3)
        return x
    
    def cal_attention(self,q,k,v,mask):
        qk = torch.matmul(q, k.permute(0,1,3,2))
        dk=torch.tensor(k.shape[-1], dtype=torch.float32)
        #dk is a tensor scalar!
        attention = qk/torch.sqrt(dk)

        # print("CHECKING PADDING MASK CODE")
        # print("q shape", q.shape)
        # print("attention shape", attention.shape)
        if mask is not None:
            attention += (mask*-1e9)
        # print("attention values after masking", attention[0,0,:,:])
        attention_weights = F.softmax(attention, dim=-1) # should be applied along the sequence which is the 3rd dimension
        output = torch.matmul(attention_weights, v)


        # print("mask", mask[0,0,:,:])
        # print("q shape", q.shape)
        # print("mask shape", mask.shape)
        # print("attention weights shape", attention_weights.shape)
        # print("attention weights example", attention_weights[0,0,:,:])
        return output, attention_weights
    
    def forward(self, v,k,q,mask):
        batch_size = q.shape[0]
        # shapes for debugging
        # print("v shape", v.shape)
        # print("q shape", q.shape)
        # print("k shape", k.shape)
        # print("mask shape", mask.shape)
        q = self.split_embedding_perHead(self.Wq(q))
        k = self.split_embedding_perHead(self.Wk(k))
        v = self.split_embedding_perHead(self.Wv(v))

        # print("v shape after splitting", v.shape)
        # print("q shape after splitting", q.shape)
        # print("k shape after splitting", k.shape)

        attention,atten_weights = self.cal_attention(q,k,v,mask)
        attention = attention.permute(0,2,1,3).contiguous()
        attention = attention.reshape(batch_size, -1, self.d_models)

        output = self.FCLayer(attention)
        return output


In [98]:
# THE ENCODER LAYER
class EncoderLayer(nn.Module):
    def __init__(self,d_model,dff):
        super(EncoderLayer,self).__init__()
        self.FeedForwardNN = nn.Sequential(
            nn.Linear(d_model,dff),
            nn.ReLU(),
            nn.Linear(dff,dff)
        )

    def forward(self,x):
        output = self.FeedForwardNN(x)
        logger.info(f"encoder output dimensions {output.shape}")
        return output

In [99]:
# THE DECODER LAYER
class DecoderLayer(nn.Module):
    def __init__(self,d_model, num_heads, dff):
        super(DecoderLayer,self).__init__()
        self.MultiHAttention1 = MultiHeadAttention(d_model, num_heads)
        self.MultiHAttention2 = MultiHeadAttention(d_model, num_heads)
        self.FeedForwardNN = nn.Sequential(
            nn.Linear(d_model,dff),
            nn.ReLU(),
            nn.Linear(dff,d_model)

        )
        self.layerNorm1 = nn.LayerNorm(d_model, eps=1e-6)
        self.layerNorm2 = nn.LayerNorm(d_model, eps=1e-6)
        self.layerNorm3 = nn.LayerNorm(d_model, eps=1e-6)

    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        # print(f"FIRST MHA WITH LOOK AHEAD MASK")
        attn_output1 = self.MultiHAttention1(x,x,x,look_ahead_mask)
        attn_output1 = self.layerNorm1(x+attn_output1)
        # print(f"decoder input into second multihead attention layer:{attn_output1.shape}")
        attn_output2 = self.MultiHAttention2(enc_output, enc_output,attn_output1, padding_mask)
        attn_output2 = self.layerNorm2(attn_output2+attn_output1)

        Feedforward_output = self.FeedForwardNN(attn_output2)
        final_output = self.layerNorm3(attn_output2+Feedforward_output)
        return final_output

In [100]:
class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = nn.Embedding(target_vocab_size, d_model) # d_model is the size of embedding vector
        self.pos_encoding = self.positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, dff) for _ in range(num_layers)])

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return torch.tensor(pos_encoding, dtype=torch.float32)

    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        return pos * angle_rates

    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        logger.info(f"decoder input shape to the embedding: {x.shape}")
        seq_len = x.size(1)
        x = self.embedding(x)
        logger.info(f"decoder input shape after embedding: {x.shape}")
        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
        x += self.pos_encoding[:, :seq_len, :]

        for i in range(self.num_layers):
            x = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)

        logger.info(f"final decoder output shape {x.shape}")
        return x


In [101]:
# TRANSFORMER

class Transformer(nn.Module):
    def __init__(self,num_layers, enc_d_model, dec_d_model,
                enc_num_heads, dec_num_heads, enc_dff, 
                dec_dff, target_vocab_size, pe_target):
        super(Transformer, self).__init__()

        # self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size)
        self.encoder = EncoderLayer(enc_d_model, enc_dff)
        self.decoder = Decoder(num_layers, dec_d_model, dec_num_heads, dec_dff, target_vocab_size, pe_target)
        self.final_layer = nn.Linear(dec_d_model, target_vocab_size)

        
    def forward(self, properties, target, look_ahead_mask, dec_padding_mask, training):
        logger.info("ENCODER STARTED")
        enc_output = self.encoder(properties)
        logger.info("ENCODER COMPLETED")
        # currently the encoder output will be [batch_size, 1, d_model] i.e. sequence of size 1
        # to ensure it is compatable with the decoder MHA first layer, 
        # we need to expand sequence length to same length as target
        enc_output_reshaped = enc_output.unsqueeze(1).repeat(1, target.shape[1],1)
        logger.info(f"encoder output dimensions:{enc_output.shape}")
        logger.info(f"encoder output reshaped: {enc_output_reshaped.shape}")
        logger.info("DECODER STARTED")

        dec_output = self.decoder(target, enc_output_reshaped, look_ahead_mask, dec_padding_mask)
        ffl_output = self.final_layer(dec_output)

        #####during training:
        if training:
            return ffl_output
        
        else:

        ##### During inference::
        # # the ffl output is is of shape [batch, seq_len, target_vocab_size]
        # # the last dimension will need to be passed through a softmax to determine 
        # # the most likely token
            # print("transformer output logits: ", ffl_output.shape)
            probabilities = F.softmax(ffl_output, dim=-1)
            # print("probabilities: ", probabilities.shape)
            # To get the predicted tokens
            predicted_tokens = torch.argmax(probabilities, dim=-1)
            # print("final token", predicted_tokens.shape)
            return predicted_tokens

# Data preprocessing

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

class MoleculeDataset(Dataset):
    def __init__(self, properties, smiles):
        self.properties = properties
        self.smiles = smiles

    def __len__(self):
        return len(self.properties)

    def __getitem__(self, idx):
        return torch.tensor(self.properties[idx], dtype=torch.float32), torch.tensor(self.smiles[idx], dtype=torch.long)


def preprocess_data(csv_file):
    data = pd.read_csv(csv_file)
    properties = data[['polararea', 'complexity', 'heavycnt', 'hbonddonor', 'hbondacc']].values
    smiles = data['isosmiles'].values
    print("length of smiles: ", smiles.shape)
    # print(f"smiles: {smiles}")

    # print(f"properties: {properties}")
    
    # Normalize properties
    scaler = StandardScaler()
    properties = scaler.fit_transform(properties)
    
    # Convert SMILES to a list of character indices
    # only unique characters remain
    # this is for creating a vocab to use to enumerate the smiles notation
    char_to_idx = {char: idx + 3 for idx, char in enumerate(sorted(set(''.join(smiles))))}
    char_to_idx['<pad>'] = 0
    char_to_idx['<start>'] = 1
    char_to_idx['<end>'] = 2

    print(char_to_idx)
    # reversing the index to character
    idx_to_char = {idx: char for char, idx in char_to_idx.items()}
    
    max_smiles_len = max(len(s) for s in smiles)+2 # 2 for the start and end token
    print("max smiles length: ", max_smiles_len)
    smiles_indices = [
        [char_to_idx['<start>']] + [char_to_idx[char] for char in smi] + [char_to_idx['<end>']] + 
        [char_to_idx['<pad>']] * (max_smiles_len - len(smi) - 2)
        for smi in smiles
    ]

    # testing the smiles indices code
    print("smiles length:",len(smiles_indices))
    for smile_i in smiles_indices:
        print("smiles example: ", smile_i)
        print("smiles example length: ", len(smile_i))
        break

    return properties, smiles_indices, char_to_idx, idx_to_char, scaler

properties, smiles_indices, char_to_idx, idx_to_char, scaler = preprocess_data('Pubchem.csv')

train_props, test_props, train_smiles, test_smiles = train_test_split(properties, smiles_indices, test_size=0.1, random_state=42)

train_dataset = MoleculeDataset(train_props, train_smiles)
test_dataset = MoleculeDataset(test_props, test_smiles)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True)


In [103]:
def create_padding_mask(seq):
    seq_masked = torch.tensor(seq) == 0 # True if value is 0 otherwise false
    return seq_masked.unsqueeze(1).unsqueeze(2) 

def create_look_ahead_mask(size):
    # creating an upper triangle of 1s
    mask = torch.triu(torch.ones((size, size)), diagonal=1) 
    return mask.unsqueeze(0).unsqueeze(1)


In [None]:
import torch.optim as optim
import torch.nn as nn

def loss_function(real, pred):
    mask = real != 0
    # print(real.shape)
    # print(f"real shape: {real.shape}")
    # probabilities
    # probabilities = F.softmax(pred, dim=-1)
    # predicted_tokens = torch.argmax(probabilities, dim=-1)
    # print(predicted_tokens[0,:])

  
    loss = nn.CrossEntropyLoss(reduction='none')(pred.transpose(1, 2), real)
    # this crossentropyloss does both the softmax classication and the loss calculation
    # print("loss shape",loss.shape)
    # print("loss:", loss)

    mask = mask.float()
    # print("mask",mask)
    # print("loss matrix shape", loss.shape)
    # print("loss before mask",loss)
    loss *= mask
    # print("loss after mask", loss)

    return torch.mean(loss)

def train_model(transformer, train_loader, num_epochs, learning_rate, model_name, pretrained):

    # loading pretrained models where available
    if pretrained:
        transformer.load_state_dict(torch.load(model_name))
    optimizer = optim.Adam(transformer.parameters(), lr=learning_rate)
    
    for epoch in range(num_epochs):
        transformer.train()
        total_loss = 0
        
        for idx_num, (properties, smiles) in enumerate(train_loader):
   
            # print("properties: ", properties.shape)
            # print("smiles", smiles.shape)
            # print("smiles example", smiles[0,:])
            properties = properties.to(device)
            smiles = smiles.to(device)
            # print("target: ",smiles.shape)
            # print("properties:", properties.shape)
            # print("smiles before masking: ",smiles)
            
            # print("smiles after masking", enc_padding_mask)
            # print("look ahead dimension:", smiles.size(1))
            look_ahead_mask = create_look_ahead_mask(smiles.size(1))
            dec_padding_mask = create_padding_mask(smiles)

            # print("look ahead mask shape", look_ahead_mask.shape)
            # print("padding mask shape", dec_padding_mask.shape)
            # print("look ahead mask: ", look_ahead_mask)
            # print("padding mask", dec_padding_mask)
            
            optimizer.zero_grad()
            predictions = transformer(properties, smiles, look_ahead_mask, dec_padding_mask, training=True)
            # print("predictions", predictions.shape)
            # print("smiles: ", smiles.shape)
            loss = loss_function(smiles[:, 1:], predictions[:, :-1])
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            print("batch loss", loss.item())
            # save model at the end of each epoch
            torch.save(transformer.state_dict(), model_name)
            
        print(f'Epoch {epoch+1}, Loss: {total_loss / (idx_num + 1)}')

# Initialize the model
target_vocab_size = len(char_to_idx)
print("target vocab size", target_vocab_size)
num_layers = 8
enc_d_model = 5 # number of properties
dec_d_model = 128
enc_num_heads = 1
dec_num_heads = 8
enc_dff = 128 # dimension of the feed forward layer
dec_dff = enc_dff
pe_target = 1000 # positional encoding
model_name = "molecularTransformer2.pth"
learning_rate = 1e-5
num_epochs = 20
pretrained = True

transformer = Transformer(num_layers, enc_d_model, dec_d_model,
                          enc_num_heads, dec_num_heads, enc_dff, 
                          dec_dff, target_vocab_size, pe_target)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transformer = transformer.to(device)

# Train the model
train_model(transformer, train_loader, num_epochs, learning_rate, model_name, pretrained)

# Inference

In [None]:
import torch
import torch.nn.functional as F

def greedy_decode(transformer, properties, max_length, start_token_idx, end_token_idx):
      # Initialize the output sequence with the start token
    output_sequence = torch.tensor([[start_token_idx]], dtype=torch.long).to(device)

    for _ in range(max_length):
        # Create masks
        look_ahead_mask = create_look_ahead_mask(output_sequence.size(1))
        dec_padding_mask = None  # No padding required during inference

        # Pass through the model
        # print("properties", properties.shape)
        # print("output sequence", output_sequence.shape)
        predictions = transformer(properties, output_sequence, look_ahead_mask, dec_padding_mask, training=False)
        # print("predictions",predictions.shape)
        # print("starting tokens:", output_sequence)
        # print("predictions", predictions)
        # Get the predicted token for the last position
        predictions = predictions[:, -1:]  # Focus on the last token in the sequence

        # Concatenate the predicted token to the output sequence
        output_sequence = torch.cat([output_sequence, predictions], dim=-1)

        # Stop if the end token is predicted
        if predictions.item() == end_token_idx:
            break

    return output_sequence.squeeze().tolist()

def infer_and_print(transformer, test_loader, max_length, start_token_idx, end_token_idx, idx_to_char):

    transformer.eval()  # Set the model to evaluation mode
    
    with torch.no_grad():
        for idx_num, (properties,smiles) in enumerate(test_loader):
            properties = properties.to(device)

            # Perform greedy decoding
            generated_sequence = greedy_decode(transformer, properties, max_length, start_token_idx, end_token_idx)


            generated_smiles = ''.join([idx_to_char[idx] for idx in generated_sequence if idx in idx_to_char and idx != 0])
            smiles = list(smiles[0,:])
            smiles = np.array(smiles)
            # print(smiles)
            actual_smiles = ''.join([idx_to_char[idx] for idx in smiles if idx in idx_to_char and idx != 0])
            print(f"Generated SMILES: {generated_smiles}")
            print("actual smiles:", actual_smiles)

# Load the trained model
transformer.load_state_dict(torch.load(model_name))

# Set up parameters for inference
max_length =  288 # Maximum sequence length for SMILES generation
start_token_idx = char_to_idx['<start>']
end_token_idx = char_to_idx['<end>']

# Perform inference on the test dataset
infer_and_print(transformer, test_loader, max_length, start_token_idx, end_token_idx, idx_to_char)
