<a href="https://colab.research.google.com/github/gatescn/Smiles_HIV_data_BertFineTuning/blob/main/SMILES_transformer_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pytorch_lightning
!pip install tensorboard



In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
import pytorch_lightning as pl
import torch
import torchmetrics
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, ClassifierMixin
from torch import nn
from transformers import BertModel, BertTokenizer, BertConfig, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
from torchmetrics.classification import BinaryAUROC
from pytorch_lightning.loggers import TensorBoardLogger
from sklearn.model_selection import GridSearchCV
from pytorch_lightning.callbacks import ModelCheckpoint
import random
import warnings
import gc

import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc



import shutil
import os
from datetime import datetime
import random

# Define the source and destination directories
current_datetime = datetime.now().strftime("%m%d%Y")

unique_identifier = f"{random.randint(1, 9999)}_{datetime.now().strftime('%S')}"

LABEL_COLUMNS = ["HIV_active"]
warnings.filterwarnings("ignore")
MODEL_NAME = 'bert-base-uncased'
LOG_OUTPUT_PATH = '/content/gdrive/MyDrive/SMILETransformerHistoricLogs/tb_logs_data_'+current_datetime+'_'+unique_identifier
batch_size=30
n_epochs=1
learning_rate=1e-3
source_dir = "/tb_logs"

checkpoint_callback = ModelCheckpoint(
      dirpath= CHECKPOINT_PATH,
      filename='{epoch}-{val_loss:.2f}',
      save_top_k=1,
      monitor='val_loss',
      mode='min'
)


In [None]:
#Google colab commands
#!nvidia-smi
#torch.cuda.empty_cache()

Fri Dec  6 21:01:14 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   34C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [None]:

torch.cuda.memory_summary()



In [None]:
class SmilesDataSet(Dataset):

    def __init__(self, _data, _tokenizer: BertTokenizer, max_token_length=512):
        self.tokenizer = _tokenizer
        self.n_samples = _data.shape[0]
        self.max_length = max_token_length
        self.data = _data


    def __getitem__(self, index: int):
        data_row = self.data.iloc[index]

        smiles_text = data_row.smiles_activity
        labels = data_row[LABEL_COLUMNS]
        encoding = self.tokenizer.encode_plus(
            smiles_text,
            add_special_tokens=True,
            max_length = self.max_length,
            return_token_type_ids=False,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return dict(
            smiles_activity = smiles_text,
            input_ids=encoding["input_ids"].flatten(),
            attention_mask=encoding["attention_mask"].flatten(),
            labels=torch.FloatTensor(labels)
        )

    def __len__(self):
        return self.n_samples

In [None]:
#for testing using external data ensure: test_data_dir= (path to test data files), extTest=True
class SmilesDataModule(pl.LightningDataModule):
    def __init__(self, data_dir,  tokenizer, batch_size, test_data_dir= None, extTest=False, num_workers=7, max_token_length=512):
        super().__init__()
        self.data_dir = data_dir
        self.test_dir = test_data_dir #if extTest is true, this should be the directory to the external test file
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.max_token_length = max_token_length
        self.tokenizer = tokenizer
        self.extTest = extTest

    def create_new_feature(self, df):
        # Combine 'smiles' and 'activity' columns into a new feature
        df['smiles_activity'] = df['smiles'] + df['activity']
        return df

    def split_data(self, df, test_size=0.2, random_state=42):
        # Split the data into training and validation datasets
        train_df, val_df = train_test_split(df, test_size=test_size, random_state=random_state, stratify=df['HIV_active'])
        default_test_df = val_df.sample(frac=0.5, replace=False)
        return train_df, val_df, default_test_df

    def setup(self, stage=None):
        # Load the CSV file into a DataFrame
        raw_data = pd.read_csv(self.data_dir)
        if self.extTest == True:
          raw_test_data = pd.read_csv(self.test_dir)
          self.updated_test_data = self.create_new_feature(raw_test_data)
          self.updated_test_data = self.updated_test_data[self.updated_test_data['smiles_activity'].str.len() <= 512]

        self.updated_data = self.create_new_feature(raw_data)
        self.updated_data = self.updated_data[self.updated_data['smiles_activity'].str.len() <= 512]
        train_d, val_d, def_test_df = self.split_data(self.updated_data)
        self.train_dataset = SmilesDataSet(_data=train_d, _tokenizer=self.tokenizer)
        self.val_dataset = SmilesDataSet(_data=val_d, _tokenizer=self.tokenizer)
        if self.extTest == True:
          sample_df = self.updated_test_data.sample(frac=0.5, replace=False)
          self.test_dataset = SmilesDataSet(_data=sample_df, _tokenizer=self.tokenizer)
        else:
          self.test_dataset = SmilesDataSet(_data=def_test_df, _tokenizer=self.tokenizer) #will be overrided if extTest is true

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            num_workers=2,
            shuffle=False
        )
    def test_dataloader(self):
      return DataLoader(
          self.test_dataset,
          batch_size=self.batch_size,
          num_workers=2,
          shuffle=False
      )

    def get_training_data(self):
        return self.train_dataset

    def get_validation_data(self):
        return self.val_dataset

    def get_training_dataloader(self):
      return self.train_dataloader

In [None]:
class SmilesTransformerClassifier(pl.LightningModule):

    def __init__(self, model_name = MODEL_NAME, learning_rate=2e-5, steps_per_epoch=None, n_epochs=None):
        super().__init__()
        config = BertConfig.from_pretrained(model_name)
        config.return_dict = True
        #config.max_position_embeddings = 600 - if we want to go above the 512 limit
        self.bert = BertModel.from_pretrained(model_name, config= config, ignore_mismatched_sizes=True)
        print("using model:"+model_name)
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1) #this will serve as way for us to get the outputs of the bert model and cinvert those into the number of classes in which we will need to predict
        self.steps_per_epoch = steps_per_epoch
        self.n_epochs = n_epochs
        self.criterion = nn.BCELoss()
        self.training_steps_outputs = []
        self.learning_rate = learning_rate

    #we will computer the loss in this method, standard in all tutorials about this fine tuning.
    # it will be none during inference, and some value when doing training.
    def forward(self, input_ids, attention_mask, labels=None):
         output = self.bert(input_ids, attention_mask=attention_mask)
         output = self.classifier(output.pooler_output)#used the pooled result from bert, #run the linear layer on top of output
         output = torch.sigmoid(output) #apply sigmoid function
         loss = 0
         if labels is not None: #if we have labels, compute the loss itself
            loss = self.criterion(output, labels)
         return loss, output #loss and output or prediction of the model

    def training_step(self, batch, batch_idx):
        input_ids = batch["input_ids"] #our data from our dataset is the batch (ref: get__item from SmilesDataSet class)
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        loss, outputs = self.forward(input_ids, attention_mask, labels)
        self.log("train_loss", loss, prog_bar=True, logger=True) #outputs the loss so we can see it, and track the progress
        self.training_steps_outputs.append({"labels":labels, "predictions":outputs})
        return{"loss": loss, "predictions": outputs, "labels": labels} #return a dictionary of values, will use later

    def validation_step(self, batch, batch_idx):
        input_ids = batch["input_ids"] #our data from our dataset is the batch (ref: get__item from SmilesDataSet class)
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        loss, outputs = self.forward(input_ids, attention_mask, labels)
        self.log("val_loss", loss, prog_bar=True, logger=True) #outputs the loss so we can see it, and track the progress
        return loss

    def test_step(self, batch, batch_idx):
        input_ids = batch["input_ids"] #our data from our dataset is the batch (ref: get__item from SmilesDataSet class)
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        loss, outputs = self.forward(input_ids, attention_mask, labels)
        self.log("test_loss", loss, prog_bar=True, logger=True) #outputs the loss so we can see it, and track the progress
        return loss

    #training epoch end: we will compute the roc score at the end of each epoch
    def on_train_epoch_end(self):
        labels = []
        predictions = []

        for output in self.training_steps_outputs:
            #.detach().cpu()
            for out_labels in output["labels"].detach().cpu():
                labels.append(out_labels)
            for out_predictions in output["predictions"].detach().cpu():
                predictions.append(out_predictions)

        labels = torch.stack(labels) #flatten
        predictions = torch.stack(predictions) #flatten

        auroc = BinaryAUROC()
        roc_score = auroc(predictions[:, 0], labels[:,0])
        self.logger.experiment.add_scalar(f"HIV_active_roc_auc/Train", roc_score, self.current_epoch)
        self.training_steps_outputs.clear()
        gc.collect()

    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr= self.learning_rate)
        warmup_steps = self.steps_per_epoch // 3
        total_steps = self.steps_per_epoch * self.n_epochs - warmup_steps

        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            warmup_steps,
            total_steps
        )
        return [optimizer], [scheduler]

In [None]:
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
dm = SmilesDataModule(data_dir='./HIV.csv',
                      batch_size= batch_size,
                      tokenizer=tokenizer,
                      test_data_dir= './Shuffled_HIV.csv',
                      extTest=True)
dm.prepare_data()
dm.setup()
data_size = len(dm.train_dataset)
steps_per_epoch = data_size // batch_size

model = SmilesTransformerClassifier(steps_per_epoch=steps_per_epoch, n_epochs=n_epochs, learning_rate=learning_rate)



# Initialize the TensorBoard logger
logger = TensorBoardLogger("tb_logs", name="SMILES Transformer Model")
trainer = pl.Trainer(max_epochs=n_epochs, callbacks=[checkpoint_callback], logger=logger, accelerator="gpu")
%reload_ext tensorboard
%tensorboard --logdir tb_logs
trainer.fit(model,dm)

In [None]:

result = trainer.test(model,dm)


INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |          | 0/? [00:00<?, ?it/s]

TypeError: list indices must be integers or slices, not str