# Uncondtional Generative Pretrained Transformer (GPT)

## Setup the notebook and import necessary libraries

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import math
import numpy as np
import requests
from io import StringIO
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Descriptors, QED, rdMolDescriptors, AllChem, DataStructs
from rdkit import RDLogger
from scipy.stats import entropy

from warnings import filterwarnings
filterwarnings("ignore")

lg = RDLogger.logger()
lg.setLevel(RDLogger.ERROR)
RDLogger.DisableLog('rdApp.*')

## Load Dataset, Tokenize SMILES and make PyTorch Dataloaders

Function to load the ESOL dataset and make it a pandas dataframe

In [2]:
def load_esol_dataset():
    url = "https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv"
    response = requests.get(url)
    response.raise_for_status()

    df = pd.read_csv(StringIO(response.text))
    print(f"Loaded dataset with {len(df)} compounds")

    df = df[['smiles', 'logP', 'qed', 'SAS']]
    df.columns = ['SMILES', 'LogP', 'QED', 'SAS']

    # Remove invalid SMILES
    valid_smiles = []
    for smiles in df['SMILES']:
        mol = Chem.MolFromSmiles(smiles)
        valid_smiles.append(mol is not None)

    df = df[valid_smiles].reset_index(drop=True)
    return df

Tokenizer to convert characters in SMILES strings to tokens

In [3]:
class CharTokenizer:
    def __init__(self, smiles_list):
        chars = sorted(set(''.join(smiles_list)))
        self.itos = ['<pad>', '<bos>', '<eos>'] + chars
        self.stoi = {ch: i for i, ch in enumerate(self.itos)}

    def encode(self, smiles):
        return [self.stoi[c] for c in smiles]

    def decode(self, token_ids):
        return ''.join(self.itos[i] for i in token_ids if i > 2)

Class to define te input data format

In [4]:
class SmilesDataset(Dataset):
    def __init__(self, smiles_list, tokenizer, block_size):
        self.block_size = block_size
        self.tokenizer = tokenizer
        self.data = []
        for smi in smiles_list:
            tokens = tokenizer.encode(smi)
            tokens = [tokenizer.stoi["<bos>"]] + tokens + [tokenizer.stoi["<eos>"]]
            if len(tokens) > block_size:
                continue
            tokens += [tokenizer.stoi["<pad>"]] * (block_size - len(tokens))
            self.data.append(tokens)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = torch.tensor(self.data[idx][:-1])
        y = torch.tensor(self.data[idx][1:])
        return x, y

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = load_esol_dataset()
smiles_list = df['SMILES'].tolist()

block_size = 128

tokenizer = CharTokenizer(smiles_list)
dataset = SmilesDataset(smiles_list, tokenizer, block_size=block_size)
loader = DataLoader(dataset, batch_size=64, shuffle=True, drop_last=True)

Loaded dataset with 249455 compounds


## Defining the model configuration and Instantiating the model

Defining the configuration class and the model components

*Causal Self Attention* which is a form of Masked Self Attention is used where only the tokens from the past until the present are used to predict the next token. Each block is a unit of the Conditional GPT model which is repeated several times in the architecture.

In [6]:
class GPTConfig:
    def __init__(self, vocab_size, block_size, **kwargs):
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.embd_pdrop = 0.1
        self.resid_pdrop = 0.1
        self.attn_pdrop = 0.1
        self.n_layer = 8
        self.n_head = 8
        self.n_embd = 256
        for k, v in kwargs.items():
            setattr(self, k, v)

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.n_head = config.n_head
        self.key = nn.Linear(config.n_embd, config.n_embd)
        self.query = nn.Linear(config.n_embd, config.n_embd)
        self.value = nn.Linear(config.n_embd, config.n_embd)
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.attn_dropout = nn.Dropout(config.attn_pdrop)
        self.resid_dropout = nn.Dropout(config.resid_pdrop)
        self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                                      .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q = self.query(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        k = self.key(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = self.value(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        mask = self.mask[:, :, :T, :T].to(att.device)
        att = att.masked_fill(mask == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.proj(y)
        y = self.resid_dropout(y)
        return y

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.attn = CausalSelfAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.mlp = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.resid_pdrop),
        )

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

GPT model

In [7]:
class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
        self.drop = nn.Dropout(config.embd_pdrop)
        self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            module.weight.data.normal_(mean=0.0, std=0.02)
        if isinstance(module, nn.Linear) and module.bias is not None:
            module.bias.data.zero_()

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size, "Sequence too long for block size"

        tok_emb = self.tok_emb(idx)
        pos_emb = self.pos_emb[:, :t, :]
        x = self.drop(tok_emb + pos_emb)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.head(x)

        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=0)
        return logits, loss

    def sample(self, tokenizer, max_len=120, device='cpu', temperature=1.0, top_k=None, num_samples=1):
        self.eval()
        out_smiles = []
        bos = tokenizer.stoi["<bos>"]
        eos = tokenizer.stoi["<eos>"]
        pad = tokenizer.stoi["<pad>"]

        for _ in range(num_samples):
            idx = torch.tensor([[bos]], dtype=torch.long, device=device)
            generated = []
            with torch.no_grad():
                for _ in range(max_len):
                    logits, _ = self.forward(idx)
                    logits = logits[:, -1, :] / max(temperature, 1e-8)
                    if top_k is not None:
                        # Ensure top_k does not exceed the vocabulary size
                        k_val = min(top_k, logits.size(-1))
                        values, _ = torch.topk(logits, k_val)
                        min_values = values[:, -1].unsqueeze(-1)
                        logits = torch.where(logits < min_values, torch.tensor(-1e10, device=logits.device), logits)
                    probs = F.softmax(logits, dim=-1)
                    next_token = torch.multinomial(probs, num_samples=1)
                    token_id = next_token.item()
                    if token_id == eos:
                        break
                    if token_id == pad:
                        continue
                    generated.append(token_id)
                    idx = torch.cat([idx, next_token], dim=1)
            out_smiles.append(tokenizer.decode(generated))
        return out_smiles

In [8]:
config = GPTConfig(
        vocab_size=len(tokenizer.itos),
        block_size=block_size,
        n_layer=4,
        n_head=8,
        n_embd=128
    )
model = GPT(config)

## Training the model

In [9]:
def train_gpt(model, tokenizer, train_loader, num_epochs=10, device='cuda', lr=3e-4):
    """Train the already initialized model"""
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits, loss = model(x, targets=y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Sample at epoch end
        model.eval()
        with torch.no_grad():
            sampled = model.sample(tokenizer=tokenizer, max_len=120, device=device, temperature=1.0, num_samples=3)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")
        print(f"  Samples: {sampled[:2]}")

    return model

In [10]:
model = train_gpt(model, tokenizer, loader, num_epochs=10, device=device, lr=3e-4)

Epoch 1/10, Loss: 0.8854
  Samples: ['CC[C@@H](/C=C2\\Sc3ncccc3C=Cc3ccccc3=[NH+]2)C1C[C@@H]1C\n', 'CNC(=O)C(=O)Nc1ccc(-c2cccc(CC(C)C)c2)cc1\n']
Epoch 2/10, Loss: 0.6901
  Samples: ['CC1(C)[C@H]2CCCN2C(=O)N1CCO[C@H]1C(F)(F)F\n', 'N3CC[NH+](CC(=O)Nc3ccccc3C2)CC2CC1)[C@@H]2CCC[NH2+]1\n']
Epoch 3/10, Loss: 0.6462
  Samples: ['O=C(C[NH+](Cc1ccco1)[C@H]1CCOC2(CC2)C1)NC1CCCCCCCC1\n', 'C[C@@H](C(N)=O)[NH+](C)Cc1ccc(Cl)cc1\n']
Epoch 4/10, Loss: 0.6220
  Samples: ['CCC(CC)NC(=O)c1ccc(NC(=O)Nc2cccc(F)c2F)cc1\n', 'CCc1nccc(C(=O)N(Cc2cc(C)no2)[C@@H]2CCS(=O)(=O)C2)c1C\n']
Epoch 5/10, Loss: 0.6062
  Samples: ['COC(=O)c1cc(C)nc2ccccn12\n', 'C[C@@H]([NH2+]CC[NH+](C)C)c1ccn(Cc2ccncc2)c1\n']
Epoch 6/10, Loss: 0.5951
  Samples: ['COc1cccc(/C=C/C(=O)Nc2ccc(S(N)(=O)=O)c([N+](=O)[O-])c2)c1\n', 'CCC[NH2+]CC(C)(C)S(=O)(=O)CC\n']
Epoch 7/10, Loss: 0.5864
  Samples: ['CC1=Cc2cc([C@]34C[C@H]5CC[C@]4(CC)CC[C@H]4Br)ccc2C(C)C1\n', 'COc1ccc(OC(=O)c2ccc(S(=O)(=O)N3[C@H]4CCOC[C@@H]4C3)s2)cc1C\n']
Epoch 8/10, Loss: 0.57

## Evaluating the model

To evaluate basic metrics like validity, novelty and uniqueness. Also, measure the internal diversity of generated samples by calculating the Tanimoto Similarity Score

In [11]:
def evaluate_basic_metrics(generated_smiles, train_smiles_set):
    valid_smiles = []
    for s in generated_smiles:
        try:
            mol = Chem.MolFromSmiles(s)
            if mol is not None:
                valid_smiles.append(s)
        except:
            continue

    validity = len(valid_smiles) / len(generated_smiles) if len(generated_smiles) > 0 else 0.0
    unique_valid = list(set(valid_smiles))
    uniqueness = len(unique_valid) / len(valid_smiles) if valid_smiles else 0.0
    novelty = len([s for s in unique_valid if s not in train_smiles_set]) / len(unique_valid) if unique_valid else 0.0

    return {
        "validity": validity,
        "uniqueness": uniqueness,
        "novelty": novelty,
        "valid_smiles": unique_valid
    }

def internal_diversity(smiles_list):
    mols = [Chem.MolFromSmiles(s) for s in smiles_list]
    mols = [m for m in mols if m is not None]
    fps = [AllChem.GetMorganFingerprintAsBitVect(m, 2, nBits=2048) for m in mols]
    N = len(fps)
    if N < 2:
        return 0.0
    distances = []
    for i in range(N):
        for j in range(i+1, N):
            sim = DataStructs.TanimotoSimilarity(fps[i], fps[j])
            distances.append(1 - sim)
    return float(np.mean(distances))

Compute KL-divergence of the model to ensure that the generated samples belong to the same (similar) distribution as the original dataset

In [12]:
def compute_properties(smiles_list):
    props = []
    for s in smiles_list:
        mol = Chem.MolFromSmiles(s)
        if mol:
            props.append([
                Descriptors.MolWt(mol),
                Descriptors.MolLogP(mol),
                QED.qed(mol),
                rdMolDescriptors.CalcNumRings(mol)
            ])
    if len(props) == 0:
        return np.zeros((0, 4))
    return np.array(props)

def kl_divergence(real_props, gen_props, bins=50):
    if real_props.shape[0] < 2 or gen_props.shape[0] < 2:
        return [float('nan')] * real_props.shape[1]
    kl_values = []
    for i in range(real_props.shape[1]):
        real_hist, _ = np.histogram(real_props[:, i], bins=bins, density=True)
        gen_hist, _ = np.histogram(gen_props[:, i], bins=bins, density=True)
        real_hist += 1e-9
        gen_hist += 1e-9
        kl = entropy(real_hist, gen_hist)
        kl_values.append(kl)
    return kl_values


Function to evaluate the model based on the above metrics and the functions defined

In [13]:
def evaluate_model(model, tokenizer, train_smiles, device='cpu',
                   num_samples=5000, sample_max_len=120, temperature=1.0, top_k=None):
    model.to(device)
    print("Generating samples...")
    generated = []
    batch = 100
    reps = max(1, num_samples // batch)
    for _ in range(reps):
        gen = model.sample(tokenizer=tokenizer, max_len=sample_max_len, device=device,
                            temperature=temperature, top_k=top_k, num_samples=batch)
        generated.extend(gen)
    if len(generated) < num_samples:
        extra = num_samples - len(generated)
        gen = model.sample(tokenizer=tokenizer, max_len=sample_max_len, device=device,
                            temperature=temperature, top_k=top_k, num_samples=extra)
        generated.extend(gen)

    for i, smi in enumerate(generated):
        mol = Chem.MolFromSmiles(smi)
        if mol is None:
            generated.remove(smi)

    print("Computing metrics...")
    train_set = set(train_smiles)
    basic = evaluate_basic_metrics(generated, train_set)
    valid_smiles = basic['valid_smiles']
    diversity_score = internal_diversity(valid_smiles)

    real_props = compute_properties(train_smiles[:5000])
    gen_props = compute_properties(valid_smiles[:5000])

    if real_props.size == 0 or gen_props.size == 0:
        kl = [float('nan')] * 4
    else:
        kl = kl_divergence(real_props, gen_props)

    return {
        "validity": basic["validity"],
        "uniqueness": basic["uniqueness"],
        "novelty": basic["novelty"],
        "diversity": diversity_score,
        "kl_divergence": {
            "MolWt": kl[0] if len(kl) > 0 else float('nan'),
            "LogP": kl[1] if len(kl) > 1 else float('nan'),
            "QED": kl[2] if len(kl) > 2 else float('nan'),
            "NumRings": kl[3] if len(kl) > 3 else float('nan')
        },
        "raw_generated_examples": generated[:20]
    }

## Generate and Evaluate Samples

In [14]:
# evaluate the generated samples
metrics = evaluate_model(model, tokenizer, smiles_list, device=device,
                        num_samples=1000, temperature=1.0, top_k=50)

print(f"\nValidity: {metrics['validity']:.3f}")
print(f"Uniqueness: {metrics['uniqueness']:.3f}")
print(f"Novelty: {metrics['novelty']:.3f}")
print(f"Diversity: {metrics['diversity']:.3f}")

Generating samples...
Computing metrics...

Validity: 0.996
Uniqueness: 1.000
Novelty: 0.998
Diversity: 0.876
