# **Task** **2**

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import AutoModel, AutoTokenizer
import pandas as pd
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from datasets import load_dataset
from sklearn.metrics import mean_absolute_error, r2_score

## Loading the Datsets, Models and tokenizing the input

In [3]:
DATASET_PATH = "scikit-fingerprints/MoleculeNet_Lipophilicity"
MODEL_NAME = "ibm/MoLFormer-XL-both-10pct"
EXTERNAL_DATA_PATH = "External-Dataset_for_Task2.csv"

model = AutoModel.from_pretrained(MODEL_NAME, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)

ext_data = pd.read_csv(EXTERNAL_DATA_PATH)
smiles_strings = ext_data['SMILES'].tolist()
true_lipophilicity = ext_data['Label'].tolist()

inputs = tokenizer(smiles_strings, padding=True, truncation=True, return_tensors="pt")

loss_fn = nn.MSELoss()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


## Define compute gradient function

In [4]:
def compute_gradient(model, inputs, labels):
    model.zero_grad()
    outputs = model(**inputs).last_hidden_state[:, 0, :]
    predictions = outputs.squeeze()
    loss = loss_fn(predictions, torch.tensor(labels, dtype=torch.float32))
    loss.backward()
    gradients = [param.grad.clone().detach() for param in model.parameters() if param.grad is not None]
    return gradients

## Computing Hessian vector product by using LiSSA approximation

In [5]:
def lissa_approximation(model, inputs, labels, num_iter=10, damp=0.01, scale=25):
    v = compute_gradient(model, inputs, labels)
    h_estimate = v.copy()

    for _ in range(num_iter):
        model.zero_grad()
        outputs = model(**inputs).last_hidden_state[:, 0, :]
        predictions = outputs.squeeze()
        loss = loss_fn(predictions, torch.tensor(labels, dtype=torch.float32))
        grads = torch.autograd.grad(loss, model.parameters(), create_graph=True)
        # Compute second-order gradient (Hessian-vector product)
        hessian_vector_product = torch.autograd.grad(grads, model.parameters(), grad_outputs=h_estimate, retain_graph=True)

        with torch.no_grad():
            h_estimate = [v_i + (1 - damp) * h_est - hvp / scale for v_i, h_est, hvp in zip(v, h_estimate, hessian_vector_product)] # Update approximation

    return h_estimate

## Define influence scores function

In [6]:
def compute_influence_scores(model, ext_data, tokenizer):
    influence_scores = []
    for i in tqdm(range(len(ext_data))):
        smiles = ext_data.iloc[i]['SMILES']
        label = ext_data.iloc[i]['Label']
        inputs = tokenizer(smiles, padding=True, truncation=True, return_tensors="pt")
        h_estimate = lissa_approximation(model, inputs, [label])
        influence_score = sum(torch.norm(h).item() for h in h_estimate)
        influence_scores.append(influence_score)

    ext_data['Influence_Score'] = influence_scores
    return ext_data

## Compute influence scores and save them

In [7]:
influential_data = compute_influence_scores(model, ext_data, tokenizer)
top_k = int(0.5 * len(influential_data))  # Selecting top 50%
selected_data = influential_data.nlargest(top_k, 'Influence_Score')
selected_data.to_csv("Selected_High_Impact_Samples.csv", index=False)

  return F.mse_loss(input, target, reduction=self.reduction)
100%|██████████| 300/300 [37:35<00:00,  7.52s/it]


## Define Training parameters

In [8]:
EXTERNAL_DATA_PATH = "Selected_High_Impact_Samples.csv"
BATCH_SIZE = 16
EPOCHS = 8
LEARNING_RATE = 5e-5
model = AutoModel.from_pretrained(MODEL_NAME, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
dataset = load_dataset(DATASET_PATH)

## Class for handling SMILES strings and targets

In [9]:
class SMILESDataset(Dataset):
    def __init__(self, dataset, tokenizer, max_length=128):
        self.smiles = dataset['SMILES']
        self.labels = dataset['label']
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.smiles)

    def __getitem__(self, i):
        encoding = self.tokenizer(
            self.smiles[i],
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        label = torch.tensor(self.labels[i], dtype=torch.float)
        return {key: val.squeeze(0) for key, val in encoding.items()}, label


## Creating Dataloaders

In [10]:
# Spliting the Train and Test Sets
smiles_list = dataset['train']['SMILES']
labels_list = dataset['train']['label']
train_smiles, test_smiles, train_labels, test_labels = train_test_split(
    smiles_list, labels_list, test_size=0.2, random_state=42
)

# Creating the SMILESDataset Objects
train_dataset = SMILESDataset({'SMILES': train_smiles, 'label': train_labels}, tokenizer)
test_dataset = SMILESDataset({'SMILES': test_smiles, 'label': test_labels}, tokenizer)

# Loading the csv of the Selected data
selected_data = pd.read_csv(EXTERNAL_DATA_PATH)
selected_smiles, selected_labels = selected_data['SMILES'].tolist(), selected_data['Label'].tolist()

# Combine the high-impact samples selected with the Lipophilicity training dataset
train_smiles.extend(selected_smiles)
train_labels.extend(selected_labels)
train_dataset = SMILESDataset({'SMILES': train_smiles, 'label': train_labels}, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Define the function to fine-tune the model

In [11]:
class FineTuneModel(nn.Module):
    def __init__(self, base_model):
        super(FineTuneModel, self).__init__()
        self.base_model = base_model
        self.regressor = nn.Linear(base_model.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = outputs.last_hidden_state[:, 0, :]
        return self.regressor(hidden_state).squeeze()

## Model Initialization

In [12]:
fine_tune_model = FineTuneModel(model)
fine_tune_model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
optimizer = optim.AdamW(fine_tune_model.parameters(), lr=LEARNING_RATE)
loss_fn = nn.MSELoss()

## Fine-Tune and Evaluate Model

In [13]:
def train_model(model, train_loader, optimizer, loss_fn, epochs):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in train_loader:
            optimizer.zero_grad()
            inputs, labels = batch
            input_ids = inputs["input_ids"].to(device)
            attention_mask = inputs["attention_mask"].to(device)
            labels = labels.to(device)
            predictions = model(input_ids, attention_mask)
            loss = loss_fn(predictions, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        metrics = evaluate_model(model, test_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}")
        print(f"MSE: {metrics['MSE']:.4f}")
        print(f"Mean Absolute Error (MAE): {metrics['MAE']:.4f}")
        print(f"R² Score: {metrics['R2']:.4f}")

def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for batch in test_loader:
            inputs, labels = batch
            input_ids = inputs["input_ids"].to(device)
            attention_mask = inputs["attention_mask"].to(device)
            labels = labels.to(device)
            preds = model(input_ids, attention_mask).cpu().numpy()
            predictions.extend(preds)
            actuals.extend(labels.cpu().numpy())

    mse = np.mean((np.array(predictions) - np.array(actuals))**2)
    mae = mean_absolute_error(actuals, predictions)
    r2 = r2_score(actuals, predictions)
    return {"MSE": mse, "MAE": mae, "R2": r2}

train_model(fine_tune_model, train_loader, optimizer, loss_fn, EPOCHS)

Epoch 1/8, Loss: 0.9907912941141562
MSE: 0.6526
Mean Absolute Error (MAE): 0.6333
R² Score: 0.5583
Epoch 2/8, Loss: 0.48409354835748675
MSE: 0.5261
Mean Absolute Error (MAE): 0.5691
R² Score: 0.6439
Epoch 3/8, Loss: 0.3147531156851487
MSE: 0.4333
Mean Absolute Error (MAE): 0.5032
R² Score: 0.7067
Epoch 4/8, Loss: 0.22661856358701532
MSE: 0.4334
Mean Absolute Error (MAE): 0.5071
R² Score: 0.7067
Epoch 5/8, Loss: 0.17944144833494316
MSE: 0.3941
Mean Absolute Error (MAE): 0.4753
R² Score: 0.7332
Epoch 6/8, Loss: 0.14680061656981708
MSE: 0.4462
Mean Absolute Error (MAE): 0.5089
R² Score: 0.6980
Epoch 7/8, Loss: 0.1378116269341924
MSE: 0.4103
Mean Absolute Error (MAE): 0.4858
R² Score: 0.7223
Epoch 8/8, Loss: 0.10821927062828432
MSE: 0.3945
Mean Absolute Error (MAE): 0.4772
R² Score: 0.7330
