In [None]:
# @title Downloads RDkit, Deepchem & Transformers
!pip install rdkit-pypi
!pip install --pre deepchem
!pip install transformers
!pip install -U accelerate
!pip install -U transformers

In [None]:
# @title Imports
import codecs
import deepchem
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os
import warnings

from collections import Counter
from deepchem.feat.smiles_tokenizer import SmilesTokenizer
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Descriptors
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestRegressor
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import OneHotEncoder
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tensorflow import keras
import tensorflow as tf
from transformers import BertConfig, BertModel

In [None]:
# @title Check if GPU is available
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

In [None]:
# @title Canonical Smiles Function
def get_canonical_smiles(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    return Chem.MolToSmiles(mol, canonical=True)

In [None]:
# @title Read in the data and preprocess
csv_path = keras.utils.get_file(
    "/content/250k_rndm_zinc_drugs_clean_3.csv",
    "https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv",
)

data = pd.read_csv(csv_path)

data.rename(columns={'SMILES': 'smiles'}, inplace=True)

data = data[data["smiles"].apply(lambda x: isinstance(x, str))]

data['smiles'] = data['smiles'].apply(get_canonical_smiles)

In [None]:
# @title Tokenizer
if not os.path.exists('vocab.txt'):
    !wget https://deepchemdata.s3-us-west-1.amazonaws.com/datasets/vocab.txt

tokenizer = SmilesTokenizer('vocab.txt')
data['tokenized_smiles'] = data['smiles'].apply(tokenizer.encode)
data = data[['smiles', 'tokenized_smiles', 'logP', 'qed', 'SAS']]
data = data[data['tokenized_smiles'].apply(len) < 50]

In [None]:
# @title Padding
def pad_sequence(seq):
    return seq + [0] * (50 - len(seq))

data['tokenized_smiles'] = data['tokenized_smiles'].apply(pad_sequence)

In [None]:
# @title Add descriptors to data and normalize them
# try with both selected descriptors and 124 of them to see which one works better
selected_descriptors = [
        'EState_VSA1', 'EState_VSA10', 'EState_VSA11', 'EState_VSA2', 'EState_VSA3',
        'EState_VSA4', 'EState_VSA5', 'EState_VSA6', 'EState_VSA7', 'EState_VSA8',
        'EState_VSA9', 'LabuteASA', 'PEOE_VSA1', 'PEOE_VSA10', 'PEOE_VSA11', 'PEOE_VSA12',
        'PEOE_VSA13', 'PEOE_VSA14', 'PEOE_VSA2', 'PEOE_VSA3', 'PEOE_VSA4', 'PEOE_VSA5',
        'PEOE_VSA6', 'PEOE_VSA7', 'PEOE_VSA8', 'PEOE_VSA9', 'SMR_VSA1', 'SMR_VSA10',
        'SMR_VSA2', 'SMR_VSA3', 'SMR_VSA4', 'SMR_VSA5', 'SMR_VSA6', 'SMR_VSA7', 'SMR_VSA8',
        'SMR_VSA9', 'SlogP_VSA1', 'SlogP_VSA10', 'SlogP_VSA11', 'SlogP_VSA12', 'SlogP_VSA2',
        'SlogP_VSA3', 'SlogP_VSA4', 'SlogP_VSA5', 'SlogP_VSA6', 'SlogP_VSA7', 'SlogP_VSA8',
        'SlogP_VSA9', 'TPSA'
    ]

def compute_all_descriptors(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    descriptor_names = [x[0] for x in Descriptors._descList[:124]]
    descriptor_values = {}
    for name in descriptor_names:
        descriptor_func = getattr(Descriptors, name)
        descriptor_values[name] = descriptor_func(mol)
    return descriptor_values

descriptors_df = data['smiles'].apply(compute_all_descriptors).apply(pd.Series)
data = pd.concat([data, descriptors_df], axis=1)
normalized_data = data.loc[:, 'logP':].apply(lambda x: (x-x.mean()) / x.std(), axis=0)
normalized_data_merged = pd.merge(data[['smiles', 'tokenized_smiles']], normalized_data, right_index=True, left_index=True)
normalized_data_merged.dropna(axis=0, inplace=True)
descriptor_names = normalized_data_merged.columns.tolist()[2:]
descriptor_names = list(set(descriptor_names))


In [None]:
# @title Define Dataset
from torch.utils.data import DataLoader
class Dataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data['tokenized_smiles'].to_numpy().tolist(), dtype=torch.long)
        self.descriptors = torch.tensor(data[descriptor_names].to_numpy(), dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        input_ids = self.data[index]
        attention_mask = (input_ids != 0).long()
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'descriptors': self.descriptors[index]
        }


train_data, temp_data = train_test_split(normalized_data_merged, test_size=0.8, random_state=42)
validation_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

BATCH_SIZE = 32

train_dataset = Dataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

validation_dataset = Dataset(validation_data)
validation_loader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=False)

test_dataset = Dataset(test_data)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
# @title Define BERT model
from transformers import BertConfig, BertModel

config = BertConfig(
  vocab_size=len(tokenizer.vocab),
  hidden_size=768,
  num_hidden_layers=12,
  num_attention_heads=12,
  intermediate_size=3072,
)

bert_model = BertModel(config)


In [None]:
# @title Train MLM
from transformers import BertForMaskedLM

model = BertForMaskedLM(config=config).to('cuda')
from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15
)
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    overwrite_output_dir=True,
    num_train_epochs=18,
    per_device_train_batch_size=32,
    save_steps=10_000,
    save_total_limit=2,
)

trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
)
trainer.train()
model.save_pretrained("./results/bert_base")


In [None]:
# @title Test on example prediction




In [None]:
# @title Multiheaded regression class
class DescriptorHead(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=64, output_dim=1):
        super(DescriptorHead, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class BertForDescriptors(nn.Module):
    def __init__(self, num_descriptors=124):
        super(BertForDescriptors, self).__init__()
        self.bert = BertModel.from_pretrained("./results/bert_base")
        self.descriptor_heads = nn.ModuleList([DescriptorHead() for _ in range(num_descriptors)])


    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        # Extract the last hidden state of the token `[CLS]` for classification task
        last_hidden_state_cls = outputs.last_hidden_state[:, 0, :]

        descriptor_outputs = []

        for head in self.descriptor_heads:
            out = head(last_hidden_state_cls)
            descriptor_outputs.append(out)

        descriptor_outputs = torch.cat(descriptor_outputs, dim=1)

        return descriptor_outputs

In [None]:
# @title Initialize model
from torch.optim import Adam

model = BertForDescriptors().to('cuda')
optimizer = Adam(model.parameters(), lr=1e-4)
criterion = nn.MSELoss(reduction='sum')

In [None]:
# @title Train model
# Training loop
for epoch in range(12):  # Number of epochs
    model.train()
    train_loss = 0.0

    for batch in train_loader:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to('cuda')
        attention_mask = batch['attention_mask'].to('cuda')
        labels = batch['descriptors'].to('cuda')

        outputs = model(input_ids, attention_mask)

        # Calculate loss
        loss = criterion(outputs, labels)
        train_loss += loss.item()

        # Backpropagation
        loss.backward()
        optimizer.step()

    # Calculate average training loss
    avg_train_loss = train_loss / len(train_loader)

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in validation_loader:
            input_ids = batch['input_ids'].to('cuda')
            attention_mask = batch['attention_mask'].to('cuda')
            labels = batch['descriptors'].to('cuda')

            outputs = model(input_ids, attention_mask)

            loss = criterion(outputs, labels)
            val_loss += loss.item()

    # Calculate average validation loss
    avg_val_loss = val_loss / len(validation_loader)

    print(f"Epoch {epoch+1} completed. Training Loss: {avg_train_loss}, Validation Loss: {avg_val_loss}")

model.bert.save_pretrained("./results/bert_desc")

In [None]:
labels

In [None]:
outputs

In [None]:
# @title Preprocessing
def complete_preprocess(smiles, maxlen=50):
    try:
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            return None
        canonical_smiles = Chem.MolToSmiles(mol, canonical=True)

        tokenized = tokenizer.encode(canonical_smiles)

        if len(tokenized) > maxlen:
            return None

        padded = pad_sequence(tokenized)

        return padded

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [None]:
loaded_bert = BertModel.from_pretrained("./results/bert_desc")

In [None]:
# @title Get Fingerprint
def get_representation(smiles, model):
    try:
        preprocessed_data = complete_preprocess(smiles)
        if preprocessed_data is None:
            return None

        padded_sequence = torch.tensor([preprocessed_data])

        attention_mask = (torch.tensor(padded_sequence) != 0).long()

        model.eval()

        # Forward pass
        with torch.no_grad():
            outputs = model(padded_sequence, attention_mask)

        # Extract the [CLS] token's features
        cls_features = outputs.last_hidden_state[:, 0, :]

        return cls_features

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [None]:
get_representation('COC', loaded_bert)

In [None]:
csv_file_path = "your-data.csv"
data_toxic = pd.read_csv(csv_file_path)

In [None]:
for i in range(768):
    data_toxic[f'dim_{i+1}'] = None

# Loop through each SMILES string
for index, row in data_toxic.iterrows():
    smiles = row['SMILES']
    representation = get_representation(smiles, loaded_bert)

    if representation is not None:
        for i, value in enumerate(representation[0]):
            data_toxic.at[index, f'dim_{i+1}'] = value.item()
