<a href="https://colab.research.google.com/github/MinaAzizii/Master-Thesis/blob/Predictive-Models/QY.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip uninstall torch torchvision torchaudio

In [None]:
!pip install torch==2.1.0 torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121  # GPU version
!pip install torch-scatter -f https://data.pyg.org/whl/torch-2.1.0+cu121.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu121.html
!pip install torch-geometric

In [None]:
import torch
if torch.cuda.is_available():
    print("✅ CUDA is available!")
else:
    print("❌ CUDA is not available. Check your PyTorch installation.")

In [None]:
!pip install --upgrade numpy==1.24.4 # Ensure numpy is upgraded as well

In [None]:
!pip install pandas openpyxl seaborn matplotlib tqdm scikit-learn

In [None]:
!pip install rdkit-pypi scikit-learn matplotlib

In [None]:
!pip install networkx==2.8.8



In [None]:
!pip install mordred

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch_geometric.loader import DataLoader
from torch_geometric.data import Data, Batch
from torch_geometric.nn import NNConv, global_mean_pool
from rdkit import Chem
from rdkit.Chem import Descriptors
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import seaborn as sns
import matplotlib.pyplot as plt
import random
from rdkit import RDLogger

In [None]:
RDLogger.DisableLog('rdApp.*')

In [None]:
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)

In [None]:
def compute_descriptors(smiles, func_list):
    m = Chem.MolFromSmiles(smiles)
    vals = []
    for _, fn in func_list:
        try:
            v = fn(m)
            vals.append(v if np.isfinite(v) else 0.0)
        except:
            vals.append(0.0)
    return np.array(vals, dtype=float)

In [None]:
# 1) LOAD & CLEAN
df = pd.read_excel("DB2.xlsx")
# assume QY values are in column 'QY'
df = df.dropna(subset=['smiles', 'solvent', 'QY', 'solvent_name'])
mask_valid = (
    df['smiles'].apply(lambda s: Chem.MolFromSmiles(s) is not None) &
    df['solvent'].apply(lambda s: Chem.MolFromSmiles(s) is not None)
)
df = df[mask_valid].reset_index(drop=True)
print(f"Dropped {len(df) - len(df)} rows with invalid SMILES or missing 'QY'.")
# report how many samples remain
print(f"Number of samples with valid QY: {len(df)}")
# or equivalently
print(f"Dataset shape: {df.shape}")  # (rows, columns)

In [None]:
from rdkit import Chem
from rdkit.Chem import SanitizeMol, SanitizeFlags

# … after df = df[mask_valid].reset_index(drop=True) …

bad = []  # list of (column, row_index, smi)
for col in ['smiles','solvent']:
    for i, smi in enumerate(df[col]):
        m = Chem.MolFromSmiles(smi, sanitize=False)
        code = Chem.SanitizeMol(m,
                                sanitizeOps=SanitizeFlags.SANITIZE_PROPERTIES,
                                catchErrors=True)
        if code != Chem.SanitizeFlags.SANITIZE_NONE:
            bad.append((col, i, smi))

if bad:
    print("⚠️ Found unusual‐charge warnings in these rows:")
    for col, i, smi in bad:
        print(f" • {col} @ row {i}: {smi}")
    # optionally drop them:
    drop_idxs = {i for _, i, _ in bad}
    df = df.drop(drop_idxs).reset_index(drop=True)
    print(f"Dropped {len(drop_idxs)} rows.")
else:
    print("No unusual‐charge warnings detected.")

print(f"Remaining samples: {len(df)}")


In [None]:
# --- validate SMILES for both molecule and solvent ---
is_valid_mol = df['smiles'].apply(lambda s: Chem.MolFromSmiles(s) is not None)
is_valid_sol = df['solvent'].apply(lambda s: Chem.MolFromSmiles(s) is not None)
mask_valid   = is_valid_mol & is_valid_sol

# count and remove invalid rows
n_invalid = (~mask_valid).sum()
df        = df[mask_valid].reset_index(drop=True)
print(f"Removed {n_invalid} rows with invalid SMILES.")

# report how many samples remain
print(f"Remaining samples (with valid 'QY'): {len(df)}")

In [None]:
# 2) DESCRIPTORS
mol_funcs = [
    ("Mol_MolWt", Descriptors.MolWt),
    ("Mol_TPSA", Descriptors.TPSA),
    ("Mol_NumRotatableBonds", Descriptors.NumRotatableBonds),
    ("Mol_LogP", Descriptors.MolLogP),
    ("Mol_Aromaticity", Descriptors.NumAromaticRings),
    ("Mol_NumHDonors", Descriptors.NumHDonors),
    ("Mol_NumHAcceptors", Descriptors.NumHAcceptors),
    ("Mol_FractionCSP3", Descriptors.FractionCSP3),
    ("Mol_HeteroatomCount", Descriptors.HeavyAtomCount),
]
solvent_funcs = [
    ("Solv_MolWt", Descriptors.MolWt),
    ("Solv_TPSA", Descriptors.TPSA),
    ("Solv_MolLogP", Descriptors.MolLogP),
    ("Solv_NumHDonors", Descriptors.NumHDonors),
]

# precompute raw arrays
mol_raw = {sm: compute_descriptors(sm, mol_funcs) for sm in df['smiles'].unique()}
solv_raw = {sm: compute_descriptors(sm, solvent_funcs) for sm in df['solvent'].unique()}

mol_arr = np.vstack([mol_raw[sm] for sm in df['smiles']])
solv_arr = np.vstack([solv_raw[sm] for sm in df['solvent']])

desc_df = pd.DataFrame({
    'smiles': df['smiles'],
    'solvent': df['solvent'],
    'QY': df['QY'],
    'solvent_name': df['solvent_name']
})
for i, (col, _) in enumerate(solvent_funcs):
    desc_df[col] = solv_arr[:, i]
for i, (col, _) in enumerate(mol_funcs):
    desc_df[col] = mol_arr[:, i]
desc_df.to_excel("all_raw_descriptors.xlsx", index=False)
print("✅ Saved all_raw_descriptors.xlsx")

In [None]:
# 3) SPLIT & SCALE
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

solv_scaler   = StandardScaler().fit(solv_arr)
mol_scaler    = StandardScaler().fit(mol_arr)


solv_scaled = {sm: torch.tensor(solv_scaler.transform(raw.reshape(1, -1))[0], dtype=torch.float)
               for sm, raw in solv_raw.items()}
mol_scaled  = {sm: torch.tensor(mol_scaler.transform(raw.reshape(1, -1))[0], dtype=torch.float)
               for sm, raw in mol_raw.items()}

In [None]:
# graph conversion
def mol_to_graph(smiles):
    mol = Chem.MolFromSmiles(smiles)
    atoms = mol.GetAtoms()
    heavy = [i for i, a in enumerate(atoms) if a.GetAtomicNum() > 1] or list(range(len(atoms)))
    idx_map = {old: i for i, old in enumerate(heavy)}
    x = torch.tensor([[atoms[i].GetAtomicNum(), atoms[i].GetFormalCharge(), atoms[i].GetNumExplicitHs()]
                      for i in heavy], dtype=torch.float)
    edges, attrs = [], []
    for b in mol.GetBonds():
        i, j = b.GetBeginAtomIdx(), b.GetEndAtomIdx()
        if i in heavy and j in heavy:
            ei, ej = idx_map[i], idx_map[j]
            onehot = [int(b.GetBondType() == t) for t in
                      (Chem.rdchem.BondType.SINGLE, Chem.rdchem.BondType.DOUBLE,
                       Chem.rdchem.BondType.TRIPLE, Chem.rdchem.BondType.AROMATIC)]
            edges += [[ei, ej], [ej, ei]]
            attrs += [onehot, onehot]
    if not edges:
        edge_index = torch.zeros((2, 0), dtype=torch.long)
        edge_attr  = torch.zeros((0, 4), dtype=torch.float)
    else:
        edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()
        edge_attr  = torch.tensor(attrs, dtype=torch.float)
    return Data(x=x, edge_index=edge_index, edge_attr=edge_attr)


In [None]:
class EmiDataset(Dataset):
    def __init__(self, df): self.df = df.reset_index(drop=True)
    def __len__(self): return len(self.df)
    def __getitem__(self, i):
        row = self.df.iloc[i]
        mg = mol_to_graph(row['smiles'])
        sg = mol_to_graph(row['solvent'])
        sdesc = solv_scaled[row['solvent']]
        mdesc = mol_scaled[row['smiles']]
        y = torch.tensor(row['QY'], dtype=torch.float)
        return mg, sg, sdesc, mdesc, y, row['solvent_name']


In [None]:
def collate_fn(batch):
    mg, sg, sd, md, y, names = zip(*batch)
    return (Batch.from_data_list(mg), Batch.from_data_list(sg),
            torch.stack(sd), torch.stack(md), torch.stack(y), list(names))

train_loader = DataLoader(EmiDataset(train_df), batch_size=32, shuffle=True,  collate_fn=collate_fn)
val_loader   = DataLoader(EmiDataset(val_df),   batch_size=64, shuffle=False, collate_fn=collate_fn)


In [None]:
class GCNEncoder(nn.Module):
    def __init__(self, in_dim, hid, out_dim):
        super().__init__()
        # edge networks
        self.e1 = nn.Sequential(nn.Linear(4, hid * in_dim), nn.ReLU(), nn.Linear(hid * in_dim, hid * in_dim))
        self.e2 = nn.Sequential(nn.Linear(4, hid * hid), nn.ReLU(), nn.Linear(hid * hid, hid * hid))
        self.e3 = nn.Sequential(nn.Linear(4, out_dim * hid), nn.ReLU(), nn.Linear(out_dim * hid, out_dim * hid))

        # GCN layers
        self.c1 = NNConv(in_dim, hid, self.e1, aggr='mean')
        self.c2 = NNConv(hid, hid, self.e2, aggr='mean')
        self.c3 = NNConv(hid, out_dim, self.e3, aggr='mean')

    def forward(self, x, ei, batch, ea):
        x = self.c1(x, ei, ea).relu()
        x = self.c2(x, ei, ea).relu()
        x = self.c3(x, ei, ea).relu()
        return global_mean_pool(x, batch)


In [None]:
class SolvationPredictor(nn.Module):
    def __init__(self):
        super().__init__()
        self.gcn_mol = GCNEncoder(3, 64, 64)  # Input dim, hidden dim, output dim for molecule GCN
        self.gcn_sol = GCNEncoder(3, 32, 64)  # Input dim, hidden dim, output dim for solvent GCN
        self.mlp_sol = nn.Sequential(nn.Linear(4, 64), nn.ReLU(), nn.Linear(64, 64))
        self.mlp_mol = nn.Sequential(nn.Linear(9, 64), nn.ReLU(), nn.Linear(64, 64))

        # MLP for processing concatenated GCN outputs
        self.mlp_gcn = nn.Sequential(
            nn.Linear(64 + 64, 128),  # Input size is now the sum of GCN output sizes (Adjusted)
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU()
        )

        # Second MLP to fuse GCN-derived features with descriptor features
        self.fuse = nn.Sequential(
            nn.Linear(64 + 64 + 64, 128),  # Input size: GCN output + solvent descriptors + molecule descriptors
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 1),  # Output a single value for lambda absorption

        )

    def forward(self, mg, sg, sdesc, mdesc):
        # Get the features from the GCNs
        me = self.gcn_mol(mg.x, mg.edge_index, mg.batch, mg.edge_attr)
        se = self.gcn_sol(sg.x, sg.edge_index, sg.batch, sg.edge_attr)

        # Pass descriptors through MLPs
        sf = self.mlp_sol(sdesc)
        mf = self.mlp_mol(mdesc)

        # Concatenate GCN outputs
        gcn_cat = torch.cat([me, se], dim=-1)  # (Adjusted)

        # Pass concatenated GCN outputs through the MLP
        gcn_out = self.mlp_gcn(gcn_cat)

        # Concatenate with descriptor features for final prediction
        cat = torch.cat([gcn_out, sf, mf], dim=-1)  # Includes descriptor features

        return self.fuse(cat).squeeze(-1)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model  = SolvationPredictor().to(device)
opt    = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0)
loss_fn=nn.SmoothL1Loss(beta=0.2)

In [None]:
# 7) TRAINING
train_losses, val_losses = [], []
for ep in range(1, 101):
    model.train()
    t_loss, t_cnt = 0., 0
    for mg, sg, sd, md, y, _ in train_loader:
        mg, sg, sd, md, y = mg.to(device), sg.to(device), sd.to(device), md.to(device), y.to(device)
        pred = model(mg, sg, sd, md)
        loss = loss_fn(pred, y)
        opt.zero_grad(); loss.backward(); opt.step()
        t_loss += loss.item() * y.size(0); t_cnt += y.size(0)
    train_losses.append(t_loss/t_cnt)

    model.eval()
    v_loss, v_cnt = 0., 0
    with torch.no_grad():
        for mg, sg, sd, md, y, _ in val_loader:
            mg, sg, sd, md, y = mg.to(device), sg.to(device), sd.to(device), md.to(device), y.to(device)
            pred = model(mg, sg, sd, md)
            loss = loss_fn(pred, y)
            v_loss += loss.item() * y.size(0); v_cnt += y.size(0)
    val_losses.append(v_loss/v_cnt)
    print(f"Epoch {ep:02d} → Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}")

In [None]:
# Final evaluation on raw QY
model.eval()
ps, ts = [], []
all_names = []
with torch.no_grad():
    for mg, sg, sd, md, y, _ in val_loader:
        mg, sg, sd, md = mg.to(device), sg.to(device), sd.to(device), md.to(device)
        p = model(mg, sg, sd, md).cpu().numpy()
        ps.append(p)
        ts.append(y.numpy())

ps = np.concatenate(ps)
ts = np.concatenate(ts)

# Compute metrics
mse  = mean_squared_error(ts, ps)
rmse = np.sqrt(mse)
r2   = r2_score(ts, ps)

print(f"MSE (QY)   = {mse:.3f}")
print(f"RMSE (QY)  = {rmse:.3f}")
print(f"R² (QY)    = {r2:.3f}")

# Scatter plot
plt.figure(figsize=(5,5))
plt.scatter(ts, ps, alpha=0.6)
mn, mx = ts.min(), ts.max()
plt.plot([mn, mx], [mn, mx], 'k--')
plt.xlabel("Actual QY")
plt.ylabel("Predicted QY")
plt.title("QY: Pred vs. Actual")
plt.tight_layout()
plt.show()



In [None]:
# Final evaluation on raw QY
model.eval()
ps, ts = [], []
all_names = []
with torch.no_grad():
    for mg, sg, sd, md, y, names in val_loader: # Capture solvent names
        mg, sg, sd, md = mg.to(device), sg.to(device), sd.to(device), md.to(device)
        p = model(mg, sg, sd, md).cpu().numpy()
        ps.append(p)
        ts.append(y.numpy())
        all_names.extend(names) # Append solvent names

ps = np.concatenate(ps)
ts = np.concatenate(ts)

# Compute metrics
mse  = mean_squared_error(ts, ps)
rmse = np.sqrt(mse)
r2   = r2_score(ts, ps)

print(f"MSE (QY)   = {mse:.3f}")
print(f"RMSE (QY)  = {rmse:.3f}")
print(f"R² (QY)    = {r2:.3f}")

# Scatter plot
plt.figure(figsize=(5,5))
plt.scatter(ts, ps, alpha=0.6)
mn, mx = ts.min(), ts.max()
plt.plot([mn, mx], [mn, mx], 'k--')
plt.xlabel("Actual QY")
plt.ylabel("Predicted QY")
plt.title("QY: Pred vs. Actual")
plt.tight_layout()
plt.show()
res_df = pd.DataFrame({
    'solvent_name':        all_names, # all_names is now populated
    'actual_QY':           ts,
    'predicted_QY':        ps
})
# Save to Excel
res_df.to_excel("QY_predictions.xlsx", index=False)
print("✅ Saved QY_predictions.xlsx")

# Plot train vs. validation loss
plt.figure(figsize=(6,4))
plt.plot(range(1, len(train_losses)+1), train_losses, label='Train Loss')
plt.plot(range(1, len(val_losses)+1),   val_losses,   label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs. Validation Loss')
plt.legend()
plt.tight_layout()
plt.show()


plt.figure(figsize=(6,4))
plt.plot(range(1,101), train_losses, label='Train')
plt.plot(range(1,101), val_losses,   label='Val')
plt.xlabel('Epoch'); plt.ylabel('Loss')
plt.title('Training vs. Validation Loss'); plt.legend(); plt.tight_layout(); plt.show()

In [None]:
torch.save(model.state_dict(), "model_QY.pt")


In [None]:
from google.colab import files
files.download("model_QY.pt")
