In [1]:
import re
from rdkit import Chem
from rdkit.Chem import AllChem, DataStructs
from rdkit.DataStructs.cDataStructs import ConvertToNumpyArray
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from rdkit.Chem.Scaffolds.MurckoScaffold import MurckoScaffoldSmiles, MurckoScaffoldSmilesFromSmiles
from rdkit.DataStructs.cDataStructs import ExplicitBitVect, TanimotoSimilarity
import chromadb
from chromadb import Client, Settings, EmbeddingFunction
from langchain_openai import ChatOpenAI
from langchain_community.chat_models import ChatOpenAI
from langchain.schema import HumanMessage 
import statistics

# Env

In [2]:
import os
os.environ["OPENAI_API_KEY"] = "key"

# Config

In [49]:
radius=4
nBits=1024
llm_model = 'gpt-3.5-turbo'
context_window = 20
template = "{context}. As a language model you have an ability to understand this data based on the data provided, we do not need to build a regression model for a Language model Estimate. You can find relationships and make estimates. Make an estimate for Permeabilty based on SMILES and list of amino acids and on previous data for this row of data {query}. Report the predicted permeability value in the following format: PERMEABILITY(%value)."

# Utils


In [4]:
def get_morgan_fingerprint(smiles, radius=4, nBits=1024):
    mol = Chem.MolFromSmiles(smiles)
    morgan_fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits)
    arr = np.zeros((1,), dtype=np.int8)
    ConvertToNumpyArray(morgan_fp, arr)
    return arr.tolist()


def find_smiles(text):
    pattern = r'SMILES{([^}]*)}'
    match = re.search(pattern, text)
    if match:
        return match.group(1)
    else:
        return None
    
def chemical_embedding(text):
    return get_morgan_fingerprint(find_smiles(text), radius, nBits)

In [5]:

class ChemicalEmbeddingFunction(EmbeddingFunction):
    def __call__(self, input):
        smiles = self.find_smiles(input)
        if smiles:
            return self.get_morgan_fingerprint(smiles)
        return None
    
    def embed_query(self, query):
        # This method should handle the embedding of queries
        smiles = self.find_smiles(query)
        if smiles:
            return self.get_morgan_fingerprint(smiles)
        return None

    def find_smiles(self, text):
        pattern = r'SMILES{([^}]*)}'
        match = re.search(pattern, text)
        if match:
            return match.group(1)
        else:
            return None

    def get_morgan_fingerprint(self, smiles, radius=radius, nBits=nBits):
        mol = Chem.MolFromSmiles(smiles)
        morgan_fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits)
        arr = np.zeros((1,), dtype=np.int8)
        ConvertToNumpyArray(morgan_fp, arr)
        return arr.tolist()

In [6]:
def convert_aldehydes_to_acids(smiles_list):
    rxn_smarts = '[CX3H1:1](=O)[H].[OH2:2]>>[CX3:1](=O)[O:2]'
    rxn = AllChem.ReactionFromSmarts(rxn_smarts)
    
    acid_smiles_list = []
    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        mol = Chem.AddHs(mol)
        ps = rxn.RunReactants((mol, Chem.MolFromSmiles('O')))
        if ps:
            product_mol = ps[0][0]  
            Chem.SanitizeMol(product_mol)  
            acid_smiles = Chem.MolToSmiles(Chem.RemoveHs(product_mol), isomericSmiles=True, canonical=True)
            acid_smiles_list.append(acid_smiles)
        else:
            print('Issue with converting C=O into -COOH')
            acid_smiles_list.append(smiles)
    
    return acid_smiles_list

def tokenize_peptides(smiles):
    mol = Chem.MolFromSmiles(smiles)
    pat = Chem.MolFromSmarts('NC=O')
    
    # Finding the largest ring
    def find_largest_ring(mol):
        sssr = Chem.GetSymmSSSR(mol)
        largest_ring = max(sssr, key=len)
        return set(largest_ring)

    largest_ring = find_largest_ring(mol)
    matches = mol.GetSubstructMatches(pat)
    
    emol = Chem.EditableMol(mol)

    bonds_to_break = []

    for match in matches:
        N_idx, C_idx, O_idx = match
        if N_idx in largest_ring and C_idx in largest_ring:
            bonds_to_break.append((N_idx, C_idx))

    # Break bonds 
    for N_idx, C_idx in sorted(bonds_to_break, reverse=True):  # Sort and reverse to avoid indexing issues
        emol.RemoveBond(N_idx, C_idx)

    fragmented_mol = emol.GetMol()
    frags = Chem.GetMolFrags(fragmented_mol, asMols=True, sanitizeFrags=True)
    fragment_smiles = [Chem.MolToSmiles(frag) for frag in frags]

    return convert_aldehydes_to_acids(fragment_smiles)

# Read csv

In [7]:
train_set = pd.read_csv('../data/train_set.csv')
test_set = pd.read_csv('../data/test_set.csv')
val_set = pd.read_csv('../data/val_set.csv')

# Chemical VDB generation

In [8]:

class ChemicalEmbeddingGenerator:

    @staticmethod
    def generate_embedding(chunk):
        def extract_smiles(text):
            pattern = r'SMILES{([^}]*)}'
            match = re.search(pattern, text)
            if match:
                return match.group(1)
            return None
        smiles_string = extract_smiles(chunk)
        try:
            mol = Chem.MolFromSmiles(smiles_string)
            if mol is not None:
                morgan_fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=4, nBits=1024)
                arr = np.zeros((1,), dtype=np.int8)
                ConvertToNumpyArray(morgan_fp, arr)
                #print("Embedding generated successfully.")
                return arr.tolist()
            else:
                print("Invalid SMILES string.")
                return None
        except Exception as e:
            print(f"An error occurred while generating embedding: {e}")
            return None
        


class DataFrameManager:
    def __init__(self):
        self.embedding_generator = ChemicalEmbeddingGenerator()
        self.chroma_client = chromadb.chromadb.Client()
        self.collection_name = "chemical_data_collection"
        self.collection = self.chroma_client.get_or_create_collection(name=self.collection_name,
                                                                      metadata={"hnsw:space": "cosine"})
        
    def add_texts_to_collection(self, dataframe, text_column, columns_to_keep):
        embeddings_list = []
        documents_list = []
        metadatas_list = []
        ids_list = []
        
        for index, row in dataframe.iterrows():
            embedding = self.embedding_generator.generate_embedding(row[text_column])
            if embedding is not None:
                unique_id = f"row_{index}"
                embeddings_list.append(embedding)
                documents_list.append(row[text_column])

                metadata = {column: row[column] for column in columns_to_keep if row[column] is not None}
                
                metadatas_list.append(metadata)
                ids_list.append(unique_id)
        
        self.collection.add(
            embeddings=embeddings_list,
            documents=documents_list,
            metadatas=metadatas_list,
            ids=ids_list
        )
        print("Data added to collection successfully.")
        return self.collection


In [9]:
def get_context(query_dict_output):
    context=''
    for instance in query_dict_output['documents'][0]:
        context+=instance + '. '
    return context

def get_distances(query_dict_output):
    return query_dict_output['distances'][0]

In [None]:
def extract_value(input_string):
    match = re.search(r'PERMEABILITY\(([^)]+)\)', input_string)

    if match:
        # Extracting the value
        permeability_value = match.group(1)
    else:
        permeability_value = None

    return float(permeability_value)

In [10]:
try:
    if manager.collection:
        manager.chroma_client.delete_collection("chemical_data_collection")
except:
    pass
manager = DataFrameManager()
manager.add_texts_to_collection(train_set, 'Formatted_String', ['Formatted_String'])


Data added to collection successfully.


Collection(name=chemical_data_collection)

In [100]:
def get_values_from_context(input_string):

    match = re.search(r'permeability value is ([\-\d\.]+)', input_string)
    permeability_value = match.group(1)

    return float(permeability_value)

def context_values(context_dict):
    permeability = []
    for sample in context_dict['documents'][0]:
        permeability.append(get_values_from_context(sample))
    return permeability

def context_stats(context_permeability_list):
    ''' The first value is mean, the second one is the closest by Tanimoto structure'''
    return statistics.fmean(context_permeability_list), context_permeability_list[0] 

In [108]:
def find_closest(target, numbers):
    return min(numbers, key=lambda x: abs(x - target))

In [101]:
text = test_set['Formatted_String'][0]
ground_truth = test_set['PAMPA'][0]
sample_from_chroma = manager.collection.query(query_embeddings=[chemical_embedding(text)],
    n_results=context_window)

context_metrics = context_stats(context_values(sample_from_chroma))

prompt = template.format(context=get_context(sample_from_chroma), query=text)
openai_llm = ChatOpenAI(model_name=llm_model)
predicted_values = []
for i in range(3):
    response = openai_llm([HumanMessage(content=prompt)])
    token_stats = response.response_metadata['token_usage']
    predicted_value = extract_value(response.content)
    predicted_values.append(predicted_value)
mean_llm_response = statistics.fmean(predicted_values)
closest_guess = find_closest(ground_truth, predicted_values)
predicted_values

[-5.84, -5.8, -5.77]

In [107]:
ground_truth

-5.85

-5.84

In [102]:
context_metrics

(-5.8525, -5.81)