In [1]:
import torch
from transformers import AutoModel, AutoTokenizer,AutoModelForMaskedLM, AutoModelForCausalLM,AutoModelForSeq2SeqLM,GraphormerForGraphClassification
import pubchempy as pcp
from scipy.io import loadmat
import pandas as pd
import numpy as np 
from rdkit import Chem
import os

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
base_dir = '../../../../T5 EVO'

In [None]:
set_seeds(seed=2024)

In [None]:
def extract_representations(
    tokenizer,
    model,
    model_name: str,   
    ds: str,
    input_type: str = "smiles",   # "smiles" or "selfies"
    token: int = 0,               # which token to read (e.g., CLS/first)
    save_path: str | None = None,
) -> pd.DataFrame:
    """
    Create embeddings from UNIQUE CIDs using either SMILES or SELFIES.
    The output DF has:
        ['cid', 'isomeric_text', 'canonical_text', 'input_type', 'model', 'layer', 'e0', ..., 'e{d-1}'].

    Tokenization uses the chosen input_type; for each row it prefers the isomeric text if present,
    otherwise falls back to the canonical text. Both text variants are kept as metadata columns.
    """
    assert input_type.lower() in {"smiles", "selfies"}, "input_type must be 'smiles' or 'selfies'"
    model.eval()

    # ---- Load & pick columns ----
    df = pd.read_csv(f"datasets/{ds}/{ds}_data.csv")
    if "cid" not in df.columns:
        raise ValueError("Dataset must contain a 'cid' column.")

    if input_type.lower() == "smiles":
        iso_col = "isomericsmiles"
        can_col = "canonicalsmiles"
    else:  # selfies
        iso_col = "isomericselfies"
        can_col = "canonicalselfies"

    if iso_col is None and can_col is None:
        raise ValueError(f"No {input_type} columns found.")

    use_cols = ["cid"]
    if iso_col: use_cols.append(iso_col)
    if can_col: use_cols.append(can_col)

    work = df[use_cols].copy()
    work = work.drop_duplicates(subset=["cid"], keep="first").sort_values("cid").reset_index(drop=True)

   
    # ---- Device ----
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    rows = []
    emb_dim = None

    for cid, iso_txt, can_txt in zip(work["cid"], work[iso_col], work[can_col]):
        # --- normalize texts ---
        iso_txt = iso_txt if isinstance(iso_txt, str) and iso_txt.strip() else ""
        can_txt = can_txt if isinstance(can_txt, str) and can_txt.strip() else ""
        if not iso_txt and not can_txt:
            continue  # nothing to encode for this cid

        # --- run model for isomeric (if present) ---
        iso_hiddens = None
        if iso_txt:
            iso_inputs = tokenizer([iso_txt], padding=True, truncation=True, return_tensors="pt")
            iso_inputs = {k: v.to(device) for k, v in iso_inputs.items()}
            with torch.no_grad():
                iso_out = model(**iso_inputs, output_hidden_states=True)
            iso_hiddens = iso_out.hidden_states  # tuple of [B,T,D] tensors

        # --- run model for canonical (if present) ---
        can_hiddens = None
        if can_txt:
            can_inputs = tokenizer([can_txt], padding=True, truncation=True, return_tensors="pt")
            can_inputs = {k: v.to(device) for k, v in can_inputs.items()}
            with torch.no_grad():
                can_out = model(**can_inputs, output_hidden_states=True)
            can_hiddens = can_out.hidden_states

        # number of layers to emit = max of the two (they should match)
        n_layers = max(
            len(iso_hiddens) if iso_hiddens is not None else 0,
            len(can_hiddens) if can_hiddens is not None else 0,
        )

        for layer_idx in range(n_layers):
            # get vectors (or None) for this layer
           
            iso_vec = iso_hiddens[layer_idx][0, token, :].detach().cpu().numpy()

            
            can_vec = can_hiddens[layer_idx][0, token, :].detach().cpu().numpy()

            # set / check embedding dim
            if emb_dim is None:
                
                emb_dim = iso_vec.shape[0]
                emb_dim = can_vec.shape[0]
            # sanity: if both exist, ensure same D
            if (iso_vec is not None) and (can_vec is not None):
                assert iso_vec.shape[0] == can_vec.shape[0], "Iso/Can dims differ!"

            row = {
                "cid": cid,
                "isomeric_text": iso_txt,
                "canonical_text": can_txt,
                "input_type": input_type.lower(),   # "smiles" or "selfies"
                "model": model_name,
                "layer": layer_idx,
            }

            # add iso_* columns (fill with NaN if missing)
            if emb_dim is None:
                continue  # defensive; should not happen if any vec exists
            for i in range(emb_dim):
                row[f"iso_e{i}"] = float(iso_vec[i]) 
                row[f"can_e{i}"] = float(can_vec[i])

            rows.append(row)


        out_df = pd.DataFrame(rows)

        
    os.makedirs(save_path, exist_ok=True)

    out_df.to_csv(f"{save_path}/{ds}_{model_name.split('/')[1]}_embeddings.csv", index=False)



In [None]:
def extract_representations(
    model,
    model_name: str,   
    ds: str,
    input_type: str = "smiles",   # "smiles" or "selfies"
    save_path: str | None = None,
) -> pd.DataFrame:
    """
    Create embeddings from UNIQUE CIDs using either SMILES or SELFIES.
    The output DF has:
        ['cid', 'isomeric_text', 'canonical_text', 'input_type', 'model', 'layer', 'e0', ..., 'e{d-1}'].

    Tokenization uses the chosen input_type; for each row it prefers the isomeric text if present,
    otherwise falls back to the canonical text. Both text variants are kept as metadata columns.
    """
    assert input_type.lower() in {"smiles", "selfies"}, "input_type must be 'smiles' or 'selfies'"
    model.eval()

    # ---- Load & pick columns ----
    df = pd.read_csv(f"datasets/{ds}/{ds}_data.csv")
    if "cid" not in df.columns:
        raise ValueError("Dataset must contain a 'cid' column.")

    if input_type.lower() == "smiles":
        iso_col = "isomericsmiles"
        can_col = "canonicalsmiles"
    else:  # selfies
        iso_col = "isomericselfies"
        can_col = "canonicalselfies"

    if iso_col is None and can_col is None:
        raise ValueError(f"No {input_type} columns found.")

    use_cols = ["cid"]
    if iso_col: use_cols.append(iso_col)
    if can_col: use_cols.append(can_col)

    work = df[use_cols].copy()
    work = work.drop_duplicates(subset=["cid"], keep="first").sort_values("cid").reset_index(drop=True)

   
    # ---- Device ----
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    rows = []
    emb_dim = None

    for cid, iso_txt, can_txt in zip(work["cid"], work[iso_col], work[can_col]):
        # --- normalize texts ---
        iso_txt = iso_txt if isinstance(iso_txt, str) and iso_txt.strip() else ""
        can_txt = can_txt if isinstance(can_txt, str) and can_txt.strip() else ""
        if not iso_txt and not can_txt:
            continue  # nothing to encode for this cid

        # --- run model for isomeric (if present) ---
        iso_hiddens = None
        if iso_txt:
            
            iso_out = model(iso_txt)
            

        # --- run model for canonical (if present) ---
        
        if can_txt:
            
            with torch.no_grad():
                can_out = model(can_txt)
            
       
       
        if emb_dim is None:
            
            emb_dim = iso_out.shape[0]
            emb_dim = can_out.shape[0]
        # sanity: if both exist, ensure same D
        if (iso_out is not None) and (can_out is not None):
            assert iso_out.shape[0] == can_out.shape[0], "Iso/Can dims differ!"
        row = {
            "cid": cid,
            "isomeric_text": iso_txt,
            "canonical_text": can_txt,
            "input_type": input_type.lower(),   # "smiles" or "selfies"
            "model": model_name,
            
        }
        # add iso_* columns (fill with NaN if missing)
        if emb_dim is None:
            continue  # defensive; should not happen if any vec exists
        for i in range(emb_dim):
            row[f"iso_e{i}"] = float(iso_out[i]) 
            row[f"can_e{i}"] = float(can_out[i])
        rows.append(row)

    out_df = pd.DataFrame(rows)

        
    os.makedirs(save_path, exist_ok=True)

    out_df.to_csv(f"{save_path}/{ds}_{model_name.split('/')[1]}_embeddings.csv", index=False)



In [None]:
# def extract_representations_by_molfeat(model_name,transformer,input_type='smiles',token=0):
    
#     for subject_id in range(s_start,s_end+1):
#         input_molecules = pd.read_csv(f'{base_dir}/fmri/embeddings{ds}/CIDs_smiles_selfies_{subject_id}{ds}.csv')[input_type].values.tolist()
        
#         outputs = transformer(input_molecules)
#         np.save(f'{base_dir}/fmri/embeddings{ds}/embeddings_{model_name}_{subject_id}_{-1}{ds}.npy',outputs)

# Encoder-Only

## MoLFormer-XL-both-10pct

In [30]:
Input_types = {
    'ibm/MoLFormer-XL-both-10pct':'smiles',
    'seyonec/ChemBERTa-zinc-base-v1':'smiles',
    'jonghyunlee/ChemBERT_ChEMBL_pretrained':'smiles',
    "HUBioDataLab/SELFormer":'selfies'
    }

for model_name in ['ibm/MoLFormer-XL-both-10pct','seyonec/ChemBERTa-zinc-base-v1',"jonghyunlee/ChemBERT_ChEMBL_pretrained","HUBioDataLab/SELFormer"]:
    for ds in ['sagar2023','keller2016','bierling2025']:
    
        model = AutoModel.from_pretrained(model_name, trust_remote_code=True)
        tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        extract_representations(tokenizer, model,model_name,save_path='embeddings',ds=ds,input_type=Input_types[model_name])



Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
  df = pd.read_csv(f"datasets/{ds}/{ds}_data.csv")
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variabl