In [None]:
!pip install rdkit-pypi
!pip install pubchempy
!pip install openai

In [None]:
import pandas as pd
import numpy as np
import pubchempy as pcp
from rdkit import Chem
from rdkit.Chem import Draw
import re
import ast
import requests
from IPython.display import Image
import json
import time
import requests
import json
import seaborn as sns
import matplotlib.pyplot as plt

### Convert BRENDA and GPT-extracted reactions to InChI 

In [None]:
brenda_df = pd.read_csv("brenda_df.csv")
gpt_df = pd.read_csv("gpt_extracted_22_standard.csv")

In [None]:
remove = ['more', '?', 'NAD+', 'NADH', 'NADPH', 'NADP+', 'H+', 'NAD']

brenda_df['substrates_split'] = brenda_df['substrates'].str.split(" \+ ")
brenda_df['substrates_split'] = brenda_df['substrates_split'].apply(lambda x: [item for item in x if item not in remove])
brenda_df = brenda_df[brenda_df['substrates_split'].apply(len) > 0]

brenda_df['products_split'] = brenda_df['products'].str.split(" \+ ")
brenda_df['products_split'] = brenda_df['products_split'].apply(lambda x: [item for item in x if item not in remove])
brenda_df = brenda_df[brenda_df['products_split'].apply(len) > 0]

brenda_df['substrates_tuple'] = brenda_df['substrates_split'].apply(tuple)
brenda_df['products_tuple'] = brenda_df['products_split'].apply(tuple)

brenda_df = brenda_df.drop_duplicates(subset=['pmcId', 'substrates_tuple', 'products_tuple'])
brenda_df.drop(columns=['substrates_tuple', 'products_tuple'], inplace=True)

In [None]:
remove = ['more', '?', 'NAD+', 'NADH', 'NADPH', 'NADP+', 'H+', 'NAD']

gpt_df['substrates'] = gpt_df['substrates'].apply(ast.literal_eval)
gpt_df['products'] = gpt_df['products'].apply(ast.literal_eval)

gpt_df['substrates'] = gpt_df['substrates'].apply(lambda x: [item for item in x if item not in remove])
gpt_df = gpt_df[gpt_df['substrates'].apply(len) > 0]

gpt_df['products'] = gpt_df['products'].apply(lambda x: [item for item in x if item not in remove])
gpt_df = gpt_df[gpt_df['products'].apply(len) > 0]

gpt_df['substrates_tuple'] = gpt_df['substrates'].apply(tuple)
gpt_df['products_tuple'] = gpt_df['products'].apply(tuple)

gpt_df = gpt_df.drop_duplicates(subset=['pmcId', 'substrates_tuple', 'products_tuple'])
gpt_df.drop(columns=['substrates_tuple', 'products_tuple'], inplace=True)

In [None]:
pubchem_matches = set()
cactus_matches = set()
queried = set()
pubchem_no_match = set()
cactus_no_match = set()
no_matches = set()

def get_smiles_pubchem(name):
  smiles = pcp.get_compounds(name, 'name')

  if smiles:
    return smiles[0].isomeric_smiles

  else:
    return None

def get_smiles_cactus(structure_identifier):
    url = f"https://cactus.nci.nih.gov/chemical/structure/{structure_identifier}/smiles"

    response = requests.get(url)

    if response.status_code == 200:
        return response.text
    else:
        return None
            
def get_smiles_chemspider(name):
    api_key = ''

    search_url = f'https://api.rsc.org/compounds/v1/filter/name/'
    headers = {'apikey': api_key}

    try:
        data = {'name': name}

        json_data = json.dumps(data)
        response = requests.post(search_url, headers=headers, data=json_data)
        response.raise_for_status()  
        data = response.json()

        csid = data['queryId']

        # Give the server time to load the query 
        time.sleep(1)

        details_url = f'https://api.rsc.org/compounds/v1/filter/{csid}/results'
        response = requests.get(details_url, headers=headers)
        response.raise_for_status()
        data = response.json()

        results = data['results'][0]
        details_url = f'https://api.rsc.org/compounds/v1/records/{results}/details?fields=SMILES'
        response = requests.get(details_url, headers=headers)
        response.raise_for_status()  
        data = response.json()

        print(data['smiles'])
        return data['smiles']

    except Exception as e:
        print(f'Error: {e}')
        print(f"chemspider: no match for {name}")
        return None

def convert_smiles_to_structure(smiles):
  molecule = Chem.MolFromSmiles(smiles)

  if molecule is not None:
      img = Draw.MolToImage(molecule)
      img.save("molecule.png")
      display(Image(filename="molecule.png"))
      print("SMILES notation is valid.")
  else:
      print("Invalid SMILES notation.")

def smiles_tokenizer(smiles):
    """
    Tokenize a SMILES molecule or reaction
    """
    pattern =  "(\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\\\|\/|:|~|@|\?|>|\*|\$|\%[0-9]{2}|[0-9])"
    regex = re.compile(pattern)
    tokens = [token for token in regex.findall(smiles)]
    assert smiles == ''.join(tokens)
    return ' '.join(tokens)

def get_smiles(mols, tried=False):
    for mol in mols:
        if mol != '?':
            pubchem_smile = None
            cactus_smile = None

            if mol not in queried:
                try:
                    pubchem_smile = get_smiles_pubchem(mol)
                    if pubchem_smile:
                        pubchem_matches.add((mol, pubchem_smile))
                    else:
                        pubchem_no_match.add(mol)
                        print(f"pubchem: no match for {mol}")
                except:
                    pubchem_no_match.add(mol)
                    print("pubchem: an error occurred")

                try:
                    cactus_smile = get_smiles_cactus(mol)
                    if cactus_smile:
                        cactus_matches.add((mol, cactus_smile))
                    else:
                        cactus_no_match.add(mol)
                        print(f"cactus: no match for {mol}")
                except:
                    cactus_no_match.add(mol)
                    print("cactus: an error occurred")
                    
                if not pubchem_smile and not cactus_smile:
                    no_matches.add(mol)
                    
                queried.add(mol)

def remove_stereochemistry(mol_name):
    return mol_name.split(')-')[1].strip() if ')-' in mol_name else mol_name

def smiles_to_inchi(smiles):
    """Convert SMILES to InChI using RDKit with error handling"""
    mol = Chem.MolFromSmiles(smiles)
    return Chem.MolToInchi(mol) if mol else None

def process_chemical_column_brenda(column_str):
    """Process chemical column using hardcoded get_smiles function, adds InChI to dataframe"""
    chemicals = [c for c in column_str]
    smiles = []
    inchis = []
    
    for chem in chemicals:
        get_smiles([chem])
        smile = next(
            (s for m, s in pubchem_matches if m == chem),
            next(
                (s for m, s in cactus_matches if m == chem),
                None
            )
        )
        
        if smile:
            smiles.append(smile)
            try:
                mol = Chem.MolFromSmiles(smile)
                if mol:
                    inchi_full = Chem.MolToInchi(mol)
                    if inchi_full:
                        inchi_body = inchi_full.split("InChI=", 1)[1]
                        inchis.append(inchi_body)
                    else:
                        print(f"Empty InChI for {chem}")
            except Exception as e:
                print(f"Failed InChI conversion for {chem}: {str(e)}")
    
    return smiles, inchis

def process_chemical_column_gpt(chemical_list):
    """Process chemical column using list inputs"""
    smiles = []
    inchis = []
    
    for chem in chemical_list:
        get_smiles([chem])
        smile = next(
            (s for m, s in pubchem_matches if m == chem),
            next(
                (s for m, s in cactus_matches if m == chem),
                None
            )
        )
        
        if smile:
            smiles.append(smile)
            try:
                mol = Chem.MolFromSmiles(smile)
                if mol:
                    inchi_full = Chem.MolToInchi(mol)
                    if inchi_full:
                        inchi_body = inchi_full.split("InChI=", 1)[1]
                        inchis.append(inchi_body)
                    else:
                        print(f"Empty InChI for {chem}")
            except Exception as e:
                print(f"Failed InChI conversion for {chem}: {str(e)}")
    
    return smiles, inchis

def process_reaction_dataframes(brenda_df, gpt_df):
    """Process both dataframes with hardcoded chemical conversion"""
    brenda_processed = brenda_df.copy()
    gpt_processed = gpt_df.copy()
    
    brenda_substrates_results = brenda_processed['substrates'].apply(process_chemical_column_brenda)
    brenda_products_results = brenda_processed['products'].apply(process_chemical_column_brenda)
    brenda_processed['substrates_converted_smiles'] = brenda_substrates_results.apply(lambda x: x[0])
    brenda_processed['substrates_converted_inchi'] = brenda_substrates_results.apply(lambda x: x[1])
    brenda_processed['products_converted_smiles'] = brenda_products_results.apply(lambda x: x[0])
    brenda_processed['products_converted_inchi'] = brenda_products_results.apply(lambda x: x[1])
    
    gpt_substrates_results = gpt_processed['substrates'].apply(process_chemical_column_gpt)
    gpt_products_results = gpt_processed['products'].apply(process_chemical_column_gpt)
    gpt_processed['substrates_converted_smiles'] = gpt_substrates_results.apply(lambda x: x[0])
    gpt_processed['substrates_converted_inchi'] = gpt_substrates_results.apply(lambda x: x[1])
    gpt_processed['products_converted_smiles'] = gpt_products_results.apply(lambda x: x[0])
    gpt_processed['products_converted_inchi'] = gpt_products_results.apply(lambda x: x[1])
    
    return brenda_processed, gpt_processed

In [None]:
brenda_processed, gpt_processed = process_reaction_dataframes(brenda_df, gpt_df)
brenda_processed.to_csv('brenda_processed.csv', index=False)
gpt_processed.to_csv('gpt_processed.csv', index=False)

## Reaction Extraction Evaluation Metrics

### 1. Extraction Accuracy (Precision)
Measures **how reliably GPT identifies real reactions** compared to expert-curated BRENDA data:

$$
\text{Precision} = \frac{
    \left| \text{Validated Reactions} \right|
}{
    \left| \text{GPT-Extracted Reactions} \right| 
} \times 100\%
$$

**Where:**  
- $\left| \text{Validated Reactions} \right|$ = Reactions GPT extracted that match BRENDA  
- $\left| \text{GPT-Extracted Reactions} \right|$ = Total reactions GPT identified

---

### 2. Completeness of Extraction (Recall)
Measures **how thoroughly GPT finds known reactions** from the same source material BRENDA uses:

$$
\text{Recall} = \frac{
    \left| \text{Validated Reactions} \right|
}{
    \left| \text{BRENDA Reference Reactions} \right| 
} \times 100\%
$$

**Where:**  
- $\left| \text{BRENDA Reference Reactions} \right|$ = Expert-verified reactions from source papers

In [13]:
def calculate_overlap_metrics(brenda_df, gpt_df, verbose=False):
    def normalize_inchi(inchi):
        """Remove protonation states and charge layers"""
        layers = [layer for layer in inchi.split('/') if not layer.startswith(('p', 'q'))]
        return '/'.join(layers)

    def create_reaction_key(row):
        subs = frozenset(normalize_inchi(s) for s in row['substrates_converted_inchi'])
        prods = frozenset(normalize_inchi(p) for p in row['products_converted_inchi'])
        return (subs, prods)
    
    if verbose:
        print("Processing BRENDA reactions:")
        brenda_reactions = [create_reaction_key(row) for _, row in brenda_df.iterrows()]
        for i, rxn in enumerate(brenda_reactions, 1):
            print(f"  Reaction {i}: {rxn[0]} → {rxn[1]}")
    else:
        brenda_reactions = [create_reaction_key(row) for _, row in brenda_df.iterrows()]

    if verbose:
        print("\nProcessing GPT reactions:")
        gpt_reactions = [create_reaction_key(row) for _, row in gpt_df.iterrows()]
        for i, rxn in enumerate(gpt_reactions, 1):
            print(f"  Reaction {i}: {rxn[0]} → {rxn[1]}")
    else:
        gpt_reactions = [create_reaction_key(row) for _, row in gpt_df.iterrows()]

    brenda_set = set(brenda_reactions)
    gpt_set = set(gpt_reactions)

    if verbose:
        print("\nBRENDA reaction set:")
        print(brenda_set)
        print("GPT reaction set:")
        print(gpt_set)

    common_reactions = brenda_set & gpt_set
    n_common = len(common_reactions)
    
    if verbose:
        print(f"\nCommon reactions: {n_common}")

    n_gpt = len(gpt_df)
    n_brenda = len(brenda_df)
    precision = n_common / n_gpt if n_gpt > 0 else 0
    recall = n_common / n_brenda if n_brenda > 0 else 0

    print(f"Precision: {n_common}/{n_gpt} = {precision:.2f}")
    print(f"Recall: {n_common}/{n_brenda} = {recall:.2f}")

    return precision, recall

In [None]:
scores = {}
for p in brenda_processed['pmcId'].unique():
    brenda_processed_paper = brenda_processed[brenda_processed['pmcId'] == p]
    gpt_processed_paper = gpt_processed[gpt_processed['pmcId'] == p]
    print(f"pmcId: {p}")
    gpt_precision, brenda_recall = calculate_overlap_metrics(brenda_processed_paper, gpt_processed_paper, verbose=False)
    print("\n")
    scores[p] = (gpt_precision, brenda_recall)

print(scores)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=2570873b-d089-4ed7-b836-6c2df496af15' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>