<a href="https://colab.research.google.com/github/Varshith271105/MacFormer_DD/blob/main/MACFormer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pandas numpy scikit-learn tensorflow torch

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [2]:
from  google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [81]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from keras.preprocessing.sequence import pad_sequences
import re
import torch
from collections import Counter

def load_data(filepath):
    df = pd.read_csv(filepath)
    return df['src'].tolist(), df['tgt'].tolist()

def tokenize(text):
    return re.findall(r'\w+|[^\w\s]', text, re.UNICODE)

def build_vocab(sequences):
    counter = Counter()
    for seq in sequences:
        counter.update(seq)
    vocab = {token: idx for idx, (token, _) in enumerate(counter.items(), start=4)}
    vocab['<PAD>'] = 0
    vocab['<SOS>'] = 1
    vocab['<EOS>'] = 2
    vocab['<UNK>'] = 3
    return vocab

def encode_sequences(sequences, vocab):
    # Prepend <SOS> and append <EOS> to each sequence
    return [[vocab['<SOS>']] + [vocab.get(token, vocab['<UNK>']) for token in seq] + [vocab['<EOS>']]
            for seq in sequences]

def pad_sequences_to_fixed_length(sequences, max_length):
    # Pad sequences to max_length, ensuring <SOS> and <EOS> are included
    return pad_sequences(sequences, maxlen=max_length, padding='post', value=0)  # Use 0 for <PAD>

def preprocess_data(filepath, max_length):
    # Load and tokenize data
    src, tgt = load_data(filepath)
    src_tokenized = [tokenize(s) for s in src]
    tgt_tokenized = [tokenize(t) for t in tgt]

    # Build vocabularies
    src_vocab = build_vocab(src_tokenized)
    tgt_vocab = build_vocab(tgt_tokenized)

    # Encode sequences with <SOS> and <EOS>
    src_encoded = encode_sequences(src_tokenized, src_vocab)
    tgt_encoded = encode_sequences(tgt_tokenized, tgt_vocab)

    # Pad sequences to fixed length
    src_padded = pad_sequences_to_fixed_length(src_encoded, max_length)
    tgt_padded = pad_sequences_to_fixed_length(tgt_encoded, max_length)

    return src_padded, tgt_padded, src_vocab, tgt_vocab

In [82]:
import torch
from torch.utils.data import Dataset
import pandas as pd

class CustomDataset(Dataset):
    def __init__(self, src_data, tgt_data, src_vocab, tgt_vocab):
        self.src_data = src_data
        self.tgt_data = tgt_data
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.start_idx = self.tgt_vocab['<SOS>']
        self.tgt_idx = self.tgt_vocab['<EOS>']
        self.tgt_vocab = tgt_vocab
        self.pad_idx = self.tgt_vocab['<PAD>']

    def __len__(self):
        return len(self.src_data)

    def __getitem__(self, idx):
        src = self.src_data[idx]
        tgt = self.tgt_data[idx]
        # Convert to Long (int64) tensor type
        return torch.tensor(src, dtype=torch.long), torch.tensor(tgt, dtype=torch.long)

In [83]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class EncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout1 = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.dropout2 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, src):
        src = src.transpose(0, 1)  # [batch_size, seq_len, d_model] -> [seq_len, batch_size, d_model]
        src2 = self.norm1(src)
        src2, _ = self.self_attn(src2, src2, src2)
        src = src + self.dropout1(src2)
        src2 = self.norm2(src)
        src2 = self.linear2(self.dropout2(F.relu(self.linear1(src2))))
        src = src + src2
        return src.transpose(0, 1)  # Back to [batch_size, seq_len, d_model]

class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead)
        self.cross_attn = nn.MultiheadAttention(d_model, nhead)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout1 = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.dropout2 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, tgt, memory, tgt_mask=None):
        tgt = tgt.transpose(0, 1)  # [batch_size, seq_len, d_model] -> [seq_len, batch_size, d_model]
        memory = memory.transpose(0, 1)
        tgt2 = self.norm1(tgt)
        tgt2, _ = self.self_attn(tgt2, tgt2, tgt2, attn_mask=tgt_mask)
        tgt = tgt + self.dropout1(tgt2)
        tgt2 = self.norm2(tgt)
        tgt2, _ = self.cross_attn(tgt2, memory, memory)
        tgt = tgt + self.dropout2(tgt2)
        tgt2 = self.norm3(tgt)
        tgt2 = self.linear2(self.dropout2(F.relu(self.linear1(tgt2))))
        tgt = tgt + tgt2
        return tgt.transpose(0, 1)  # Back to [batch_size, seq_len, d_model]

class MacFormer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, start_token=1, eos_token=2, d_model=256, nhead=4, num_encoder_layers=4, num_decoder_layers=4, dim_feedforward=512, dropout=0.1):
        super(MacFormer, self).__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_decoder_layers)])
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        self.start_token = start_token  # Added for compatibility
        self.eos_token = eos_token      # Added for compatibility

    def forward(self, src, tgt):
        src = self.positional_encoding(self.src_embedding(src))
        tgt = self.positional_encoding(self.tgt_embedding(tgt))

        for layer in self.encoder_layers:
            src = layer(src)

        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
        for layer in self.decoder_layers:
            tgt = layer(tgt, src, tgt_mask=tgt_mask)

        return self.fc_out(tgt)

    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool()
        return mask

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

def train_model(model, dataloader, criterion, optimizer,src_vocab, tgt_vocab, epochs=10, device='cpu'):
    model.train()
    a=-1
    for epoch in range(epochs):
        total_loss = 0
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()
            outputs = model(src, tgt)
            # Ensure target is of type Long
            loss = criterion(outputs.view(-1, outputs.size(-1)), tgt.view(-1).long())
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(dataloader)
        if(epoch%1==0):
            print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')
            print("\nSample Predictions from Test Set:")
            test_filepath = r'/content/drive/MyDrive/DD/test.csv'
            test_src_data, test_tgt_data, test_src_vocab, test_tgt_vocab = preprocess_data(test_filepath, 200)
            test_dataset = CustomDataset(test_src_data, test_tgt_data, src_vocab, tgt_vocab)
            test_dataloader = DataLoader(test_dataset, batch_size=32)
            for i in [a+1]:  # Show first three examples
                a+=1
                src_smile, tgt_smile, pred_smile = predict_smile(model, test_dataset, src_vocab, tgt_vocab, i, device)
                print("<EOS>" in tgt_vocab)  # Should print True

                print(f"\nExample {i+1}:")
                print(f"Source SMILES: {src_smile}",len(src_smile))
                print(f"Target SMILES: {tgt_smile}",len(tgt_smile))
                print(f"Predic SMILES: {pred_smile}")
                print(f"Predicted length: {len(pred_smile)}")

def evaluate_model(model, dataloader, criterion, device, pad_idx):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_tokens = 0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            outputs = model(src, tgt)

            loss = criterion(outputs.view(-1, outputs.size(-1)), tgt.view(-1).long())
            total_loss += loss.item()

            # Calculate accuracy
            predictions = outputs.argmax(dim=-1)
            mask = (tgt != pad_idx)
            correct = (predictions == tgt) & mask
            total_correct += correct.sum().item()
            total_tokens += mask.sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_tokens if total_tokens > 0 else 0

    return avg_loss, accuracy


def predict_smile(model, dataset, src_vocab, tgt_vocab, index, device):
    model.eval()
    with torch.no_grad():
        # Get source and target from dataset
        src, tgt = dataset[index]
        src = src.unsqueeze(0).to(device)  # Add batch dimension
        tgt = tgt.unsqueeze(0).to(device)  # Add batch dimension

        # Get model prediction
        outputs = model(src, tgt)  # Using actual target during prediction
        predicted = outputs.argmax(dim=-1)

        # Convert indices back to tokens (without padding)
        print(tgt_vocab)
        print(tgt[0])
        src_tokens = [list(src_vocab.keys())[list(src_vocab.values()).index(idx.item())]
                     for idx in src[0] if idx.item() not in [dataset.pad_idx]]
        tgt_tokens = [list(tgt_vocab.keys())[list(tgt_vocab.values()).index(idx.item())]
                     for idx in tgt[0] if idx.item() not in [dataset.pad_idx]]
        pred_tokens = [list(tgt_vocab.keys())[list(tgt_vocab.values()).index(idx.item())]
                     for idx in predicted[0] if idx.item() not in [dataset.pad_idx]]
        print(" ".join(tgt_tokens))
        print(predicted)
        print(" ".join(pred_tokens))

        # Join tokens without spaces for SMILES format
        return ' '.join(src_tokens), ' '.join(tgt_tokens), ' '.join(pred_tokens)




def main():
    # Load and preprocess training data
    train_filepath = r'/content/drive/MyDrive/DD/dataset.csv'
    src_data, tgt_data, src_vocab, tgt_vocab = preprocess_data(train_filepath,200)


    dataset = CustomDataset(src_data, tgt_data, src_vocab, tgt_vocab)
    dataloader = DataLoader(dataset, batch_size=32)

    # Initialize model, criterion, and optimizer
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = MacFormer(len(src_vocab), len(tgt_vocab)).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=dataset.pad_idx)
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    # Train the model
    train_model(model, dataloader, criterion, optimizer, src_vocab, tgt_vocab, epochs=1000, device=device)

    # Load and preprocess testing data
    test_filepath = r'/content/drive/MyDrive/DD/test.csv'
    test_src_data, test_tgt_data, test_src_vocab, test_tgt_vocab = preprocess_data(test_filepath,200)
    test_dataset = CustomDataset(test_src_data, test_tgt_data, test_src_vocab, test_tgt_vocab)
    test_dataloader = DataLoader(test_dataset, batch_size=32)


    # Evaluate the model
    test_loss, test_accuracy = evaluate_model(model, test_dataloader, criterion, device,test_dataset.pad_idx)
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

    print("\nSample Predictions from Test Set:")
    for i in [0, 1, 2]:  # Show first three examples
        src_smile, tgt_smile, pred_smile = predict_smile(model, test_dataset, test_src_vocab, test_tgt_vocab, i, device)
        print(f"\nExample {i+1}:")
        print(f"Source SMILES: {src_smile}")
        print(f"Target SMILES: {tgt_smile}")
        print(f"Predicted SMILES: {len(pred_smile)}")

if __name__ == "__main__":
    main()


Epoch 1/1000, Loss: 0.1621

Sample Predictions from Test Set:
{'C': 4, 'O': 5, '1': 6, '=': 7, '(': 8, ')': 9, 'N': 10, 'c': 11, '2': 12, 'n': 13, '[': 14, 'H': 15, ']': 16, '3': 17, 'S': 18, 'P': 19, 'F': 20, '4': 21, '5': 22, 'l': 23, '+': 24, '-': 25, '#': 26, 'o': 27, 'I': 28, 's': 29, 'B': 30, 'r': 31, '<PAD>': 0, '<SOS>': 1, '<EOS>': 2, '<UNK>': 3}
tensor([ 1,  4,  5,  5,  5,  5,  5,  6,  4,  5,  7,  8,  9, 10,  5,  7,  5, 11,
        12, 11, 11, 11,  7,  9, 10, 11, 11, 12, 10,  4,  5,  7,  8,  9, 10,  5,
         7,  5,  5,  7,  8,  9, 10,  9, 10,  4,  5,  7,  8,  9, 10,  5,  4,  5,
         7,  8,  9, 10,  5,  7,  5,  5,  5,  4,  8,  5,  7,  4, 10,  4, 10,  4,
         5,  6,  8,  9,  2,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
   