In [None]:
!pip install rdkit

In [None]:
# Download SA Score script (by Ertl)
!wget https://raw.githubusercontent.com/rdkit/rdkit/master/Contrib/SA_Score/sascorer.py
!wget https://github.com/rdkit/rdkit/raw/master/Contrib/SA_Score/fpscores.pkl.gz

In [None]:
import random, os, numpy as np, pandas as pd

def seed_everything(seed=42):
  random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

In [None]:
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.*')

In [None]:
df = pd.read_csv('/content/drive/MyDrive/Molecular design/ReactionML/Low fidelity/USPTO/USPTO_50K.csv')
df

In [None]:
from rdkit import Chem

def canonicalize_smiles(smi):
    """Canonicalizes multi-fragment SMILES (e.g. 'CC.O') with dot handling."""
    parts = smi.split('.')
    mols = [Chem.MolFromSmiles(part) for part in parts]
    if any(m is None for m in mols):
        return smi  # fallback if any fragment fails
    canonical_parts = [Chem.MolToSmiles(m, canonical=True) for m in mols]
    return '.'.join(sorted(canonical_parts))  # sort ensures consistent order

# Split product/reactants from reaction SMILES
df['product'] = df['reactions'].apply(lambda x: x.split('>')[-1])
df['reactants'] = df['reactions'].apply(lambda x: x.split('>')[0])

# Canonicalize both sides
df['product'] = df['product'].apply(canonicalize_smiles)
df['reactants'] = df['reactants'].apply(canonicalize_smiles)

In [None]:
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(df, test_size=0.1, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42)  # 81/9/10

In [None]:
from rdkit import Chem
from rdkit.Chem import AllChem
import numpy as np
from tqdm import tqdm

def get_ecfp4_fp(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=2, nBits=n_bits)
        arr = np.zeros((n_bits,), dtype=int)
        Chem.DataStructs.ConvertToNumpyArray(fp, arr)
        return arr
    return np.zeros((n_bits,), dtype=int)

# Compute and store ECFP4 fingerprints with progress bars
train_fps = np.array([get_ecfp4_fp(smi) for smi in tqdm(train_df['product'], desc="Processing train set")])
test_fps = np.array([get_ecfp4_fp(smi) for smi in tqdm(test_df['product'], desc="Processing test set")])

In [None]:
from rdkit import DataStructs

def max_tanimoto(test_fp, train_fps_rdkit):
    sims = DataStructs.BulkTanimotoSimilarity(test_fp, train_fps_rdkit)
    return max(sims) if sims else 0.0

# Convert all train_fps to RDKit ExplicitBitVect
train_fps_rdkit = [
    AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), 2, nBits=2048)
    for smi in train_df['product']
]

# Compute max similarity for each test sample
test_df['product_fp'] = test_df['product'].apply(
    lambda smi: AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(smi), 2, nBits=2048)
)
test_df['max_sim_to_train'] = test_df['product_fp'].apply(lambda fp: max_tanimoto(fp, train_fps_rdkit))

In [None]:
from transformers import AutoTokenizer, EncoderDecoderModel
from datasets import Dataset

chemberta_tokenizer = AutoTokenizer.from_pretrained("seyonec/ChemBERTa-zinc-base-v1")
gpt2_tokenizer = AutoTokenizer.from_pretrained("distilgpt2")

# Add pad token if missing
if gpt2_tokenizer.pad_token is None:
    gpt2_tokenizer.add_special_tokens({'pad_token': '[PAD]'})

def tokenize_function(example):
    # Encoder input = product SMILES, tokenized with ChemBERTa
    input_ = chemberta_tokenizer(
        "retro: " + example['product'],
        padding="max_length",
        truncation=True,
        max_length=128
    )

    # Decoder target = reactants SMILES, tokenized with GPT2 tokenizer
    target = gpt2_tokenizer(
        example['reactants'],
        padding="max_length",
        truncation=True,
        max_length=128
    )

    input_['labels'] = target['input_ids']
    return input_

# train_ds = Dataset.from_pandas(train_df[['product', 'reactants']])
val_ds = Dataset.from_pandas(val_df[['product', 'reactants']])
test_ds = Dataset.from_pandas(test_df[['product', 'reactants']])

# train_tokenized = train_ds.map(tokenize_function, batched=False)
val_tokenized = val_ds.map(tokenize_function, batched=False)
test_tokenized = test_ds.map(tokenize_function, batched=False)

In [None]:
from transformers import EncoderDecoderModel, AutoTokenizer
import os

output_dir = "/content/drive/MyDrive/Molecular design/ReactionML/CurriculumLearning/retrosyn-chemberta-distilgpt2"

# List subdirectories that are checkpoints
checkpoints = [d for d in os.listdir(output_dir) if d.startswith("checkpoint-")]
checkpoints = sorted(checkpoints, key=lambda x: int(x.split("-")[-1]))

# Load the latest one
latest_ckpt = os.path.join(output_dir, checkpoints[-1])
print("Loading checkpoint:", latest_ckpt)

model = EncoderDecoderModel.from_pretrained(latest_ckpt)

# Load encoder and decoder tokenizers (must match what you trained with)
chemberta_tokenizer = AutoTokenizer.from_pretrained("seyonec/ChemBERTa-zinc-base-v1")
gpt2_tokenizer = AutoTokenizer.from_pretrained("distilgpt2")

# Re-add pad token if you added one before
if gpt2_tokenizer.pad_token is None:
    gpt2_tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    model.decoder.resize_token_embeddings(len(gpt2_tokenizer))
    model.config.pad_token_id = gpt2_tokenizer.pad_token_id

# Make sure model is on the right device
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
from transformers.models.gpt2.modeling_gpt2 import GPT2LMHeadModel

def _reorder_cache(past, beam_idx):
    return tuple(
        tuple(past_state.index_select(0, beam_idx) for past_state in layer_past)
        for layer_past in past
    )

# Patch it once — ideally at the start of inference
GPT2LMHeadModel._reorder_cache = staticmethod(_reorder_cache)

In [None]:
import torch
from tqdm import tqdm
from rdkit import Chem
import pandas as pd

def generate_topk_predictions_and_matches(model, tokenizer, dataset, test_df,
                                          k=5, batch_size=64, max_length=128,
                                          num_beams=5, device="cuda",
                                          prefix="baseline"):
    """
    Populates test_df with top-k predictions and match flags for top-1 and top-k.
    The `prefix` (e.g., 'baseline' or 'curriculum') is used to name the columns.
    """
    model.eval()
    all_topk_preds = []
    top1_matches = []
    topk_matches = []

    pbar = tqdm(range(0, len(dataset), batch_size), desc=f"Generating Top-{k} Predictions ({prefix})")

    for i in pbar:
        batch = dataset[i: i + batch_size]
        input_ids = torch.tensor(batch["input_ids"]).to(device)
        attention_mask = torch.tensor(batch["attention_mask"]).to(device)
        labels = batch["labels"]

        with torch.no_grad():
            outputs = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=max_length,
                num_beams=num_beams,
                num_return_sequences=k,
                early_stopping=True,
                pad_token_id=tokenizer.pad_token_id
            )

        batch_size_actual = input_ids.size(0)
        outputs = outputs.view(batch_size_actual, k, -1)

        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

        for pred_seqs, gold in zip(outputs, decoded_labels):
            preds = tokenizer.batch_decode(pred_seqs, skip_special_tokens=True)
            preds = [p.strip() for p in preds]
            gold = gold.strip()

            all_topk_preds.append(preds)
            top1_matches.append(int(preds[0] == gold))
            topk_matches.append(int(gold in preds))

    # Store results with prefix to avoid overwriting
    test_df = test_df.copy()
    test_df[f'top{k}_preds_{prefix}'] = all_topk_preds
    test_df[f'top1_match_{prefix}'] = top1_matches
    test_df[f'top{k}_match_{prefix}'] = topk_matches

    return test_df


In [None]:
# # For baseline
# test_df = generate_topk_predictions_and_matches(
#     model=model,
#     tokenizer=gpt2_tokenizer,
#     dataset=test_tokenized,
#     test_df=test_df,
#     prefix="baseline"
# )

In [None]:
# # For baseline
# test_df = generate_topk_predictions_and_matches(
#     model=model,
#     tokenizer=gpt2_tokenizer,
#     dataset=test_tokenized,
#     test_df=test_df,
#     prefix="baseline",
#     k=1
# )

In [None]:

# # For curriculum
# test_df = generate_topk_predictions_and_matches(
#     model=model,
#     tokenizer=gpt2_tokenizer,
#     dataset=test_tokenized,
#     test_df=test_df,
#     prefix="curriculum"
# )

In [None]:

# # For curriculum
# test_df = generate_topk_predictions_and_matches(
#     model=model,
#     tokenizer=gpt2_tokenizer,
#     dataset=test_tokenized,
#     test_df=test_df,
#     prefix="curriculum",
#     k=1
# )

In [None]:
def add_top1_predictions_to_df(model, tokenizer, dataset, df, column_name="baseline_prediction", batch_size=32, max_length=128, num_beams=5):
    """
    Generates top-1 predictions using the given model and tokenizer and adds them to the dataframe.

    Args:
        model: HuggingFace-compatible seq2seq model.
        tokenizer: Corresponding tokenizer.
        dataset: A list of dicts with keys ['input_ids', 'attention_mask', 'labels'].
        df: Original DataFrame aligned with the dataset order.
        column_name: Name of the new column to store top-1 predictions.
        batch_size: Evaluation batch size.
        max_length: Max sequence length for generation.
        num_beams: Beam size for generation.

    Returns:
        A new DataFrame with an added column for predictions.
    """
    model.eval()
    all_top1_preds = []

    for i in tqdm(range(0, len(dataset), batch_size), desc=f"Generating {column_name}"):
        batch = dataset[i: i + batch_size]

        input_ids = torch.tensor(batch["input_ids"]).to(model.device)
        attention_mask = torch.tensor(batch["attention_mask"]).to(model.device)

        with torch.no_grad():
            outputs = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=max_length,
                num_beams=num_beams,
                num_return_sequences=1,
                early_stopping=True,
                pad_token_id=model.config.pad_token_id
            )

        decoded_preds = tokenizer.batch_decode(outputs, skip_special_tokens=True)
        decoded_preds = [p.strip() for p in decoded_preds]
        all_top1_preds.extend(decoded_preds)

    # Add to DataFrame
    df = df.copy()
    df[column_name] = all_top1_preds
    return df

In [None]:
test_df = add_top1_predictions_to_df(
    model=model,
    tokenizer=gpt2_tokenizer,
    dataset=test_tokenized,
    df=test_df,
    column_name="predicted_reactants_baseline",
    batch_size=64,
    max_length=128,
    num_beams=5
)

In [None]:
test_df = add_top1_predictions_to_df(
    model=model,
    tokenizer=gpt2_tokenizer,
    dataset=test_tokenized,
    df=test_df,
    column_name="predicted_reactants_curriculum",
    batch_size=64,
    max_length=128,
    num_beams=5
)

In [None]:
import numpy as np
import pandas as pd

def bootstrap_per_similarity_bin_all(
    test_df,
    bin_column='sim_bin',
    top1_baseline_col='top1_match_baseline',
    top5_baseline_col='top5_match_baseline',
    top1_curriculum_col='top1_match_curriculum',
    top5_curriculum_col='top5_match_curriculum',
    n_bootstrap=1000,
    seed=42
):
    np.random.seed(seed)
    bin_results = []

    for bin_interval in test_df[bin_column].dropna().unique():
        bin_df = test_df[test_df[bin_column] == bin_interval]
        # if len(bin_df) < 10:
        #     continue

        top1_b, top5_b = [], []
        top1_c, top5_c = [], []

        for _ in range(n_bootstrap):
            sample = bin_df.sample(frac=1.0, replace=True)

            top1_b.append(sample[top1_baseline_col].mean())
            top5_b.append(sample[top5_baseline_col].mean())
            top1_c.append(sample[top1_curriculum_col].mean())
            top5_c.append(sample[top5_curriculum_col].mean())

        bin_results.append({
            'sim_bin': bin_interval,
            'top1_baseline_mean': np.mean(top1_b) * 100,
            'top1_baseline_std':  np.std(top1_b) * 100,
            'top5_baseline_mean': np.mean(top5_b) * 100,
            'top5_baseline_std':  np.std(top5_b) * 100,
            'top1_curriculum_mean': np.mean(top1_c) * 100,
            'top1_curriculum_std':  np.std(top1_c) * 100,
            'top5_curriculum_mean': np.mean(top5_c) * 100,
            'top5_curriculum_std':  np.std(top5_c) * 100,
            'n_samples': len(bin_df)
        })

    result_df = pd.DataFrame(bin_results)
    result_df = result_df.sort_values("sim_bin").reset_index(drop=True)
    result_df['bin_mid'] = result_df['sim_bin'].apply(lambda x: x.mid)

    return result_df

In [None]:
bins = np.linspace(0, 1.0, 6)  # e.g., [0.0, 0.2, 0.4, ..., 1.0]
test_df['sim_bin'] = pd.cut(test_df['max_sim_to_train'], bins=bins)

In [None]:
grouped_df = bootstrap_per_similarity_bin_all(test_df)

In [None]:
# grouped_df

In [None]:
# Group by bins and compute mean and std for each model
grouped = test_df.groupby('sim_bin').agg({
    'top5_match_baseline': ['mean', 'std', 'count'],
    'top5_match_curriculum': ['mean', 'std', 'count']
})

# Flatten column names
grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
grouped_df = grouped_df.reset_index()

# Get midpoints of bins for plotting
grouped_df['bin_mid'] = grouped_df['sim_bin'].apply(lambda x: x.mid)

In [None]:
import matplotlib.pyplot as plt

# Define better contrasting colors
baseline_color = "#1f77b4"     # deep blue
curriculum_color = "#d62728"   # deep red

plt.style.use("default")

fig, ax = plt.subplots(figsize=(8, 6))

# Plot with standard deviation error bars
ax.errorbar(grouped_df['bin_mid'], grouped_df['top1_baseline_mean'],
            yerr=grouped_df['top1_baseline_std'], label='Baseline',
            color=baseline_color, marker='o', linewidth=2, capsize=4)

ax.errorbar(grouped_df['bin_mid'], grouped_df['top1_curriculum_mean'],
            yerr=grouped_df['top1_curriculum_std'], label='Curriculum',
            color=curriculum_color, marker='s', linewidth=2, capsize=4)

# Labels and legend
ax.set_xlabel("Tanimoto similarity to training set", fontsize=25, labelpad=10)
ax.set_ylabel("Top-1 accuracy (%)", fontsize=25, labelpad=10)

# Tick styling
ax.tick_params(axis='both', labelsize=25)
ax.set_xticks(grouped_df['bin_mid'])
ax.set_xticklabels([0.1, 0.3, 0.5, 0.7, 0.9])

# Legend
ax.legend(fontsize=25, loc='upper right', frameon=False, fancybox=False, framealpha=0.95)

plt.tight_layout()
plt.show()

In [None]:
# grouped_df

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 7))

sns.histplot(test_df['max_sim_to_train'], bins=30, kde=True, color="#4c72b0", edgecolor=None)

plt.xlabel("Maximum Tanimoto Similarity to Training Set", fontsize=32)
plt.ylabel("Number of Test Samples", fontsize=32)
plt.xticks(fontsize=32)
plt.yticks(fontsize=32)
plt.tight_layout()
plt.show()

In [None]:
def extract_failure_examples(test_df, similarity_threshold=0.3, num_examples=5):
    """
    Extracts examples where baseline failed but curriculum succeeded, from low similarity bin.
    """
    df = test_df.copy().reset_index(drop=True)

    # Filter: Low similarity + Baseline wrong + Curriculum correct
    filtered = df[
        (df['max_sim_to_train'] <= similarity_threshold) &
        (df['top1_baseline'] == 0) &
        (df['top1_curriculum'] == 0)
    ]

    # Sample a few for table
    sampled = filtered.sample(n=min(num_examples, len(filtered)), random_state=42)

    # Optionally truncate long SMILES
    def truncate(s, maxlen=50):
        return s if len(s) <= maxlen else s[:maxlen] + '...'

    table = pd.DataFrame({
        "Product SMILES": sampled['product'].apply(truncate),
        "Similarity to Train": sampled['max_sim_to_train'].round(3),
        "Ground Truth": sampled['reactants'].apply(truncate),
        "Baseline Prediction": sampled['predicted_reactants_baseline'].apply(truncate),
        "Curriculum Prediction": sampled['predicted_reactants_curriculum'].apply(truncate),
    })

    return table

failure_table = extract_failure_examples(test_df, similarity_threshold=0.3, num_examples=5)
display(failure_table)  # if in notebook

# Or export to LaTeX
# print(failure_table.to_latex(index=False, escape=False))

In [None]:
from rdkit.Chem import rdChemReactions
from rdkit.Chem.Draw import rdMolDraw2D
from IPython.display import Image, display

def draw_true_reaction(reactants_smi, product_smi, label="Reaction", size=(1200, 400), font_size=120):
    """
    Render a reaction with large atom labels using RDKit's low-level MolDraw2DCairo API.
    """
    try:
        rxn_smi = f"{product_smi}>>{reactants_smi}"
        rxn = rdChemReactions.ReactionFromSmarts(rxn_smi, useSmiles=True)

        drawer = rdMolDraw2D.MolDraw2DCairo(*size)
        opts = drawer.drawOptions()
        opts.scaleBondWidth = True
        opts.maxFontSize = font_size

        drawer.DrawReaction(rxn)
        drawer.FinishDrawing()

        print(f"🧪 {label}")
        display(Image(data=drawer.GetDrawingText()))
    except Exception as e:
        print(f"❌ Failed to draw reaction: {e}")


example = test_df.iloc[417]

draw_true_reaction(example['reactants'], example['product'], label="Ground Truth")
draw_true_reaction(example['predicted_reactants_baseline'], example['product'], label="Baseline Prediction")
# draw_true_reaction(example['predicted_reactants_curriculum'], example['product'], label="Curriculum Prediction")

In [None]:
example