In [1]:
#import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from collections import Counter
import re
from torch.utils.data import Dataset, DataLoader
from transformers import BertModel, BertTokenizer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from transformers import AutoModel
from transformers import PreTrainedTokenizer
from transformers import AdamW
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import copy
from rdkit import Chem
from SmilesPE.tokenizer import *
import codecs
from SmilesPE.learner import *
from typing import List, Optional
import os
import collections
from tokenizer_spe import *

In [2]:
df = pd.read_csv('FULL_VALID_CAN_SMILES_DATASET.csv')
print(df.shape)
df.head()

(94335, 5)


Unnamed: 0,E_homo,E_lumo,KS_gap,CAN_SMILES,MW
0,-5.17017,-2.28848,2.88169,C#Cc1[nH]ccc1c1csc2-c3c(C(=O)c12)ccs3,281.352
1,-5.29262,-2.47624,2.81638,O=C1c2c(-c3c1ccs3)scc2c1c2sccc2cc2c1ccs2,380.526
2,-5.30622,-2.40005,2.90617,Cc1sc2-c3sccc3C(=O)c2c1C1=CC(=C)c2c1csc2,338.466
3,-5.99195,-2.91162,3.08033,O=N(=O)c1cc2c(s1)c(sc2C(F)(F)F)c1csc2-c3c(C(=O...,443.463
4,-5.31983,-2.44086,2.87897,Cc1cc2c(-c3c(C2=O)c(cs3)c2cc(ccc2N(=O)=O)c2csc...,409.501


In [3]:
#Data driven Tokenization
#SPE Tokenization
spe_vob= codecs.open('SPE_data.txt')
spe = SPE_Tokenizer(spe_vob)
# some default tokens from huggingface
default_toks = ['[PAD]', 
                '[unused1]', '[unused2]', '[unused3]', '[unused4]','[unused5]', '[unused6]', '[unused7]', '[unused8]', '[unused9]', '[unused10]', 
                '[UNK]', '[CLS]', '[SEP]', '[MASK]']


# atom-level tokens 
atom_tokens = ['O', '=', 'C', '1', '2', '(', 'S', ')', '3', '4', '[NH]', '#', '5', '6', 'F',
                '[N]', '\\', '/', '[S]', 'N', '[Se]', '[CH2]', '[C]', '[CH]', '[SiH2]', '7', '8', 'Br', 'Cl', 
                '[P]', '[N+]', '[O-]', '[SH]', '9', '%10', '%11', '%12', '[N-]', 'P', ':', '[PH]', '[C-]', '[NH-]',
                'c', 's', 'n', 'o','[se]', '[nH]', '[GeH2]', '[C@@H]', '[C@@]', '[C@H]', '[S@]', '[S@@]', '[c]', '[C@]', '[n]']
# spe tokens
with open('SPE_data.txt', "r") as ins:
    spe_toks = []
    for line in ins:
        spe_toks.append(line.split('\n')[0])

spe_tokens = []
for s in spe_toks:
    spe_tokens.append(''.join(s.split(' ')))
    
spe_vocab = default_toks + atom_tokens + spe_tokens
len(spe_vocab)
with open('vocab_spe.txt', 'w') as f:
    for voc in spe_vocab:
        f.write(f'{voc}\n')
spe_tokenizer = SMILES_SPE_Tokenizer(vocab_file='vocab_spe.txt', spe_file= 'SPE_data.txt')

In [4]:
smi_1 = 'CC[N+](C)(C)Cc1ccccc1Br'
smi_2 = 'c1cccc1[invalid]'
encoded_input = spe_tokenizer(smi_1)
encoded_input

{'input_ids': [12, 75, 45, 123, 278, 155, 42, 13], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1]}

In [5]:
def spe_encode_smiles(smiles):
    try:
        encoded = spe_tokenizer(smiles)
        return encoded['input_ids']
    except Exception as e:
        print(f"Error encoding SMILES {smiles}: {str(e)}")
        return None
df['data_driven_encoded'] = df['CAN_SMILES'].apply(spe_encode_smiles)

In [6]:
class SmilesDataset(Dataset):
    def __init__(self, encodings, properties):
        self.encodings = encodings  
        self.properties = properties  

    def __len__(self):
        return len(self.properties)

    def __getitem__(self, idx):
        item = {
            'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long),
            'attention_mask': torch.tensor([1]*len(self.encodings[idx]), dtype=torch.long)
        }
        item['properties'] = torch.tensor(self.properties[idx], dtype=torch.float)
        return item


# Get the input and output data
spe_X = list(df['data_driven_encoded'])  
y = df[['E_homo', 'E_lumo']].values  

X_train, X_test, y_train, y_test = train_test_split(spe_X, y, test_size=0.2, random_state=42)

#Normalize properties
scaler = StandardScaler()
y_train_scaled = scaler.fit_transform(y_train)  
y_test_scaled = scaler.transform(y_test) 


# Convert the lists of integers to tensors
train_encodings = torch.nn.utils.rnn.pad_sequence([torch.tensor(seq) for seq in X_train], batch_first=True)
test_encodings = torch.nn.utils.rnn.pad_sequence([torch.tensor(seq) for seq in X_test], batch_first=True)


# Create datasets
train_dataset = SmilesDataset(train_encodings, y_train_scaled)
test_dataset = SmilesDataset(test_encodings, y_test_scaled)


# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [7]:
class ChemBERTaForPropertyPrediction(torch.nn.Module):
    def __init__(self, model_checkpoint, num_properties):
        super().__init__()
        self.chemberta = AutoModel.from_pretrained(model_checkpoint)
        self.regressor = torch.nn.Linear(self.chemberta.config.hidden_size, num_properties)

    def forward(self, input_ids, attention_mask):
        outputs = self.chemberta(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        return self.regressor(pooled_output)
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
spe_model = ChemBERTaForPropertyPrediction('seyonec/ChemBERTa-zinc-base-v1', num_properties=2).to(device)
spe_optimizer = AdamW(spe_model.parameters(), lr=1e-4)



In [8]:
def train(model, data_loader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in data_loader:
        optimizer.zero_grad()
        input_ids, attention_mask, properties = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['properties'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = torch.nn.functional.mse_loss(outputs, properties)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data_loader)

def evaluate(model, data_loader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in data_loader:
            input_ids, attention_mask, properties = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['properties'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = torch.nn.functional.mse_loss(outputs, properties)
            total_loss += loss.item()
    return total_loss / len(data_loader)

In [None]:
num_epochs = 10
train_losses = []
val_losses = []
for epoch in range(num_epochs):
    train_loss = train(spe_model, train_loader,spe_optimizer,device)
    val_loss = evaluate(spe_model, test_loader,device)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    print(f"Epoch {epoch}, Train loss: {train_loss}, Validation loss: {val_loss}")

  'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long),


Epoch 0, Train loss: 0.1934343444032657, Validation loss: 0.1930637525299848
Epoch 1, Train loss: 0.15708855459022036, Validation loss: 0.1484851081745099
Epoch 2, Train loss: 0.13373051101958514, Validation loss: 0.15044662804674294
Epoch 3, Train loss: 0.11906345276104388, Validation loss: 0.13252606624263827
Epoch 4, Train loss: 0.10461725329963673, Validation loss: 0.1272754873007031
Epoch 5, Train loss: 0.09587433754816657, Validation loss: 0.12596180329762272
Epoch 6, Train loss: 0.08839500935265779, Validation loss: 0.13341902753938054


In [None]:
torch.save({
            'spe_model_state_dict': spe_model.state_dict(),
            'spe_optimizer_state_dict': spe_optimizer.state_dict(),
            }, 'ChemBERTa_spe_Level.pth')


In [None]:
def evaluate_and_save(model, data_loader, scaler, device, filename):
    model.eval()
    predictions = []
    actuals = []
    
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            properties = batch['properties'].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            predictions.append(outputs.cpu().numpy())
            actuals.append(properties.cpu().numpy())
    
    # Concatenate all batches
    predictions = np.concatenate(predictions, axis=0)
    actuals = np.concatenate(actuals, axis=0)
    
    # Inverse transform of the scaled properties
    y_pred = scaler.inverse_transform(predictions)
    y_actual = scaler.inverse_transform(actuals)
    
    # Calculate metrics
    mse = mean_squared_error(y_actual, y_pred)
    r2 = r2_score(y_actual, y_pred)
    
    # Save test predictions and actual values to CSV
    test_results = pd.DataFrame({
        'E_homo_actual': y_actual[:, 0],
        'E_lumo_actual': y_actual[:, 1],
        'E_homo_predicted': y_pred[:, 0],
        'E_lumo_predicted': y_pred[:, 1]
    })
    test_results.to_csv(filename, index=False)
    
    return (y_actual, y_pred, mse, r2)

spe_model.to(device)  
evaluate_and_save(char_model, char_test_loader, scaler, device, 'spe_level_chemberta_results.csv')
