<a href="https://colab.research.google.com/github/MZiaAfzal71/Average_Weighted_Path_Vector/blob/main/Data%20Files/Models/ChemBERTaModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/MZiaAfzal71/Average_Weighted_Path_Vector.git
%cd Average_Weighted_Path_Vector/Data\ Files

In [None]:
from __future__ import annotations

import os
import random
from dataclasses import dataclass
from typing import List, Optional, Union, Dict, Any

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, root_mean_squared_error
from tqdm.auto import tqdm

In [None]:
# ----------------------------
# Config
# ----------------------------
@dataclass
class Config:
    model_name: str = "seyonec/ChemBERTa-zinc-base-v1"
    output_dir: str = "./chemberta_model_output"
    save_path: str = "best_model.pt"
    max_length: int = 128
    batch_size: int = 16
    epochs: int = 5
    lr: float = 1e-5
    weight_decay: float = 0.01
    seed: int = 42
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    dropout: float = 0.1
    train_layers: int = 2   # unfreeze last N transformer blocks; 0 = all frozen
    warmup_ratio: float = 0.1
    grad_clip: float = 0.0
    return_numpy: bool = True

# ----------------------------
# Utils
# ----------------------------
def set_seed(seed: int):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)

def rmse(y_true, y_pred):
    return float(np.sqrt(mean_squared_error(y_true, y_pred)))

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

# ----------------------------
# Model
# ----------------------------

class ChemBERTaModel(nn.Module):
    """
    ChemBERTa Model:
    """
    def __init__(self, model_name: str, dropout: float = 0.1, train_layers: int = 0):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        H = self.bert.config.hidden_size


        # Freeze backbone, optionally unfreeze tail
        # for p in self.bert.parameters():
        #     p.requires_grad = False
        # if train_layers and hasattr(self.bert, "encoder"):
        #     for layer in self.bert.encoder.layer[-train_layers:]:
        #         for p in layer.parameters():
        #             p.requires_grad = True


        # Main head on fused features
        self.main = nn.Sequential(
            nn.Linear(H, H//2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(H//2, 1),
        )


    def forward(self, input_ids, attention_mask, targets=None):
        """
        Returns:
          pred: [B]
          loss (if targets given)
        """
        out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls = out.last_hidden_state[:, 0, :]  # [B,H]

        y_pred = self.main(cls).squeeze(-1)

        loss = None
        if targets is not None:
            targets = targets.float()
            loss = F.mse_loss(y_pred, targets)

        return y_pred, loss

# ----------------------------
# Dataset / Collate
# ----------------------------
class SmileDataset(Dataset):
    def __init__(self, smiles: List[str], targets: Optional[np.ndarray],
                 tokenizer: AutoTokenizer, max_length: int):
        self.smiles = list(smiles)
        self.targets = None if targets is None else np.asarray(targets, dtype=np.float32)
        self.tok = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.smiles)

    def __getitem__(self, i):
        enc = self.tok(self.smiles[i],
                       truncation=True, padding="max_length",
                       max_length=self.max_length, return_tensors="pt")
        item = {k: v.squeeze(0) for k, v in enc.items()}
        if self.targets is not None:
            item["labels"] = torch.tensor(self.targets[i], dtype=torch.float32)
        return item

def collate_stack(batch):
    out = {k: torch.stack([b[k] for b in batch]) for k in batch[0] if k != "labels"}
    if "labels" in batch[0]:
        out["labels"] = torch.stack([b["labels"] for b in batch])
    return out


def make_loaders(df: pd.DataFrame, target_col: str, tokenizer: AutoTokenizer,
                 cfg: Config) -> Tuple[DataLoader, DataLoader]:
    # Split
    train_df = df[df["Training/Test"].str.strip().str.lower() == "training"].reset_index(drop=True)
    test_df  = df[df["Training/Test"].str.strip().str.lower() == "test"].reset_index(drop=True)

    train_ds = SmileDataset(train_df["SMILES"].tolist(),
                              train_df[target_col].to_numpy(dtype=np.float32),
                              tokenizer, cfg.max_length)
    test_ds  = SmileDataset(test_df["SMILES"].tolist(),
                              test_df[target_col].to_numpy(dtype=np.float32),
                              tokenizer, cfg.max_length)

    train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True,
                              collate_fn=collate_stack)
    test_loader  = DataLoader(test_ds, batch_size=cfg.batch_size, shuffle=False,
                              collate_fn=collate_stack)
    return train_loader, test_loader


def setup_optimizer_scheduler(model, train_dataloader, epochs, lr=2e-5, warmup_ratio=0.1):

    optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()),
                                  lr = lr, weight_decay=0.01)

    total_steps = len(train_dataloader) * epochs
    warmup_steps = int(total_steps * warmup_ratio)
    scheduler = get_linear_schedule_with_warmup(optimizer, warmup_steps, total_steps)

    return optimizer, scheduler


def train_model(model, train_loader, val_loader, optimizer, scheduler,
                device, epochs=10, grad_clip=1.0, save_path="best_model.pt"):
    """
    Full trainer loop for ChemBERTaModel.

    Args:
      model: nn.Module
      train_loader: DataLoader
      val_loader: DataLoader
      optimizer: torch.optim.Optimizer
      scheduler: torch.optim.lr_scheduler
      device: torch.device
      epochs: int
      grad_clip: float (gradient clipping norm)
      save_path: str (where to save best model)
    """
    model.to(device)
    best_val = float("inf")

    for epoch in range(1, epochs+1):
        # ---- TRAIN ----
        model.train()
        train_loss, n_train = 0.0, 0
        diag_accum = {}

        pbar = tqdm(train_loader, desc=f"Epoch {epoch} [Train]")
        for batch in pbar:
            optimizer.zero_grad()

            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            targets = batch["labels"].to(device)

            preds, loss= model(input_ids, attention_mask, targets)

            loss.backward()
            if grad_clip:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            optimizer.step()
            scheduler.step()

            bs = input_ids.size(0)
            train_loss += loss.item() * bs
            n_train += bs

            pbar.set_postfix({"loss": f"{loss.item():.4f}"})

        train_loss /= n_train
        diag_accum = {k: v / n_train for k, v in diag_accum.items()}

        # ---- VALIDATION ----
        model.eval()
        val_loss, n_val = 0.0, 0
        with torch.no_grad():
            for batch in tqdm(val_loader, desc=f"Epoch {epoch} [Val]"):
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                targets = batch["labels"].to(device)

                preds, loss = model(input_ids, attention_mask, targets)

                bs = input_ids.size(0)
                val_loss += loss.item() * bs
                n_val += bs

        val_loss /= n_val

        print(f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}, ")

        # ---- Save best ----
        if val_loss < best_val:
            best_val = val_loss
            torch.save(model.state_dict(), save_path)
            print(f"âœ… Saved best model (val_loss={val_loss:.4f})")

    print("Training complete.")
    model.load_state_dict(torch.load(save_path))
    return model

def predict(model, test_loader, device, return_numpy=True):
    """
    Run inference on a test set.

    Args:
      model: trained ChemBERTaFusionV2
      test_loader: DataLoader
      device: torch.device
      return_numpy: if True, returns numpy array

    Returns:
      preds: [N] predictions (torch.Tensor or np.ndarray)
    """
    model.eval()
    model.to(device)
    all_preds = []

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)

            preds, _ = model(input_ids, attention_mask)
            all_preds.append(preds.cpu())

    preds = torch.cat(all_preds, dim=0)

    if return_numpy:
        return preds.numpy()
    return preds

def train_for_prop(file: str, prop: str, cfg: Config) -> Dict[str, Any]:
    set_seed(cfg.seed)
    ensure_dir(cfg.output_dir)

    # ---- Load data
    target_col = f"{prop}-Measured"

    try:
        df = pd.read_excel(file, sheet_name=prop)
    except:
        raise ValueError(f"{data_file} is not found.")



    # ---- Tokenizer
    tokenizer = AutoTokenizer.from_pretrained(cfg.model_name, use_fast=True)

    # ---- Data loaders
    train_loader, test_loader = make_loaders(df, target_col, tokenizer, cfg)

    # ---- ChemBerta Model
    model = ChemBERTaModel(cfg.model_name, dropout=cfg.dropout,
                           train_layers=cfg.train_layers).to(cfg.device)

    # Parameter groups (smaller LR for backbone; larger for fusion head/regressor)
    optimizer, scheduler = setup_optimizer_scheduler(model, train_loader, cfg.epochs,
                                                    cfg.lr, cfg.warmup_ratio)

    save_model = os.path.join(cfg.output_dir, cfg.save_path)
    model = train_model(model, train_loader, test_loader, optimizer, scheduler, cfg.device,
               cfg.epochs, cfg.grad_clip, save_model)

    # Predict on all rows (Training + Test)

    tokenizer_fast = tokenizer  # reuse
    all_ds = SmileDataset(df["SMILES"].tolist(),
                            df[target_col].to_numpy(dtype=np.float32),
                            tokenizer_fast, cfg.max_length)
    all_loader = DataLoader(all_ds, batch_size=cfg.batch_size, shuffle=False,
                            collate_fn=collate_stack)

    all_preds = predict(model, all_loader, cfg.device, cfg.return_numpy)

    # Build results DF
    new_results = df.copy()

    new_results[f"{prop} Prediction"] = all_preds

    # Final metrics on Test only
    obs_test = new_results[new_results["Training/Test"].str.lower() == "test"][target_col].values
    pred_test = new_results[new_results["Training/Test"].str.lower() == "test"][f"{prop} Prediction"].values
    mae_v = mean_absolute_error(obs_test, pred_test)
    rmse_v = rmse(obs_test, pred_test)
    r2_v = r2_score(obs_test, pred_test)
    print(f"Final (best) Test metrics for {prop} â†’ MAE: {mae_v:.4f} | RMSE: {rmse_v:.4f} | RÂ²: {r2_v:.4f}")

    # Save predictions parquet
    pred_path = os.path.join(cfg.output_dir, f"{prop}_chemBERTa_preds.xlsx")
    new_results.to_excel(pred_path, index=False)
    print(f"Saved predictions â†’ {pred_path}")

    return {
        "sheet": {prop},
        "target_col": target_col,
        "best_path": save_model,
        "pred_path": pred_path,
        "MAE": mae_v, "RMSE": rmse_v, "R2": r2_v,
    }

# ----------------------------
# Multi-property runner
# ----------------------------
def run_all_properties_descriptors(file: str, prop_names: str, cfg: Config):
    ensure_dir(cfg.output_dir)
    perf_rows = []
    for prop in prop_names:
        print(f"\n=== Processing the file Zang_Data for sheet: {prop} ===")
        results = train_for_prop(file, prop, cfg)
        perf_rows.append([f"{prop}", results["MAE"], results["RMSE"], results["R2"]])
    perf_df = pd.DataFrame(perf_rows, columns=["Property", "MAE", "RMSE", "R2"])
    stats_path = os.path.join(cfg.output_dir, "chemberta_stats.csv")
    perf_df.to_csv(stats_path, index=False)
    print(f"\nðŸ“Š All-property stats saved â†’ {stats_path}")
    return perf_df


In [None]:
cfg = Config(
    output_dir="chemberta_results",
    epochs=30,
    batch_size=8,
    max_length=128,
    train_layers=3,      # unfreeze last 3 transformer blocks
    lr=1e-5,
)

file = "Excel Files/Zang_Data.xlsx"
prop_names = ["Log VP", "MP", "BP", "LogBCF", "LogS", "LogP"]
# prop_names = ["LogP"]
perf_df = run_all_properties_descriptors(file, prop_names, cfg)

perf_df