In [None]:
!pip install datasets


In [None]:
# import dependencies
import wandb                     # For experiment tracking. Used for experiment tracking, which helps log metrics and visualizations during training.
import torch                   # For tensor computations and model training
import datasets                # For working with HuggingFace datasets
import argparse                # For parsing command line arguments (if needed)
import numpy as np             # For numerical operations
import transformers            # HuggingFace Transformers library

# Additional imports for our implementation
from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer, AutoModel, default_data_collator
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import math

In [None]:
# Define our model and dataset to use
MODEL_NAME = "ibm/MoLFormer-XL-both-10pct" #we use the same pre-trained model, which we used in the first task
DATASET_PATH = "scikit-fingerprints/MoleculeNet_Lipophilicity" # we use the same dataset which we used in the first task(without the external dataset)

# Define the regression model with a regression head

This section effectively adapts a pre-trained transformer model for a regression task by attaching a regression head that takes the hidden representation (specifically, the [CLS] token) and maps it to a single scalar output.

In [None]:
class MoLFormerWithRegressionHead(nn.Module): # we create a new class that extends nn.Module
    def __init__(self, model_name):
        super(MoLFormerWithRegressionHead, self).__init__()
        # Load the pre-trained MoLFormer base model.
        self.base_model = AutoModel.from_pretrained(model_name, deterministic_eval=True, trust_remote_code=True)
        # Retrieve the hidden size from the model configuration.
        hidden_size = self.base_model.config.hidden_size
        # Define a linear layer as the regression head to map hidden states to a single output.
        self.regression_head = nn.Linear(hidden_size, 1)

    def forward(self, input_ids, attention_mask=None):
        # Forward pass through the base model.
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        # Extract the representation of the [CLS] token (assumed to be the first token).
        cls_representation = outputs.last_hidden_state[:, 0, :]
        # Pass the [CLS] representation through the regression head.
        regression_output = self.regression_head(cls_representation)
        return regression_output.squeeze(-1)  # Remove extra dimensions if necessary

# Tokenization function for SMILES strings

The function tokenize_function is designed to process SMILES strings  so that they can be fed into the model. Tokenization converts raw SMILES strings into a numerical format (token IDs) that the model can work with.

In [None]:
def tokenize_function(examples):
    return tokenizer(
        examples["SMILES"],        # Use the SMILES string column.
        padding="max_length",      # Pad sequences to the maximum length.
        truncation=True,           # Truncate sequences longer than max_length.
        max_length=128,            # Maximum sequence length.
        return_attention_mask=True )# Include attention masks.

# Entry point

In [None]:
if __name__ == "__main__":
    # Initialize wandb for experiment tracking (optional).
    wandb.init(project="bitfit_finetuning", name="bitfit_molformer_regression")

    # Set the device: GPU if available, else CPU.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # -----------------------------
    # Load and Prepare the Dataset
    # -----------------------------

    # Load the Lipophilicity dataset from HuggingFace.
    raw_dataset = load_dataset(DATASET_PATH)

    # Check if a 'test' split exists; if not, create one from the 'train' split.
    if "test" not in raw_dataset:
        # Create an 80/20 train/test split from the original training data.
        split_dataset = raw_dataset["train"].train_test_split(test_size=0.2, seed=42)
        # Construct a new DatasetDict with 'train' and 'test' splits.
        dataset = DatasetDict({
            "train": split_dataset["train"],
            "test": split_dataset["test"]
        })
    else:
        dataset = raw_dataset

    # Load the tokenizer associated with our MoLFormer model.
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)

    # Tokenize the dataset using our tokenization function.
    tokenized_datasets = dataset.map(tokenize_function, batched=True)

    # Rename the label column to "labels" for consistency with our training loop.
    tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

    # Set the format of the dataset to PyTorch tensors.
    tokenized_datasets.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

    # Create DataLoaders for the training and test sets.
    BATCH_SIZE = 16  # Adjust based on your memory constraints.
    train_loader = DataLoader(tokenized_datasets["train"], batch_size=BATCH_SIZE, shuffle=True, collate_fn=default_data_collator)
    test_loader = DataLoader(tokenized_datasets["test"], batch_size=BATCH_SIZE, shuffle=False, collate_fn=default_data_collator)

    # -----------------------------
    # Initialize the Regression Model
    # -----------------------------

    # Create an instance of the regression model and move it to the chosen device.
    model = MoLFormerWithRegressionHead(MODEL_NAME).to(device)

# Apply BitFit Fine-Tuning Strategy

In [None]:
    # Freeze all parameters in the base model except for bias terms.
    for name, param in model.base_model.named_parameters():
        if "bias" not in name:  # Only allow bias parameters to be trainable.
            param.requires_grad = False

    # print out which parameters are trainable.
    trainable_params = [name for name, param in model.named_parameters() if param.requires_grad]
    print("Trainable parameters:", trainable_params)

# Fine-Tune on the Regression Task

In [None]:
    # Define the loss function (Mean Squared Error for regression).
    criterion = nn.MSELoss()
    # Create an optimizer that only updates trainable parameters.
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)
    num_epochs = 10  # Adjust the number of epochs as needed.

    model.train()  # Set the model to training mode.
    for epoch in range(num_epochs):
        running_loss = 0.0
        for batch in train_loader:
            # Move input data and labels to the device.
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device).float()  # Ensure labels are float.

            optimizer.zero_grad()  # Reset gradients.
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)  # Forward pass.
            loss = criterion(outputs, labels)  # Compute the loss.
            loss.backward()  # Backpropagate.
            optimizer.step()  # Update only trainable parameters.

            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")
        wandb.log({"epoch": epoch+1, "loss": avg_loss})

# Save the Fine-Tuned Model

In [None]:
BITFIT_MODEL_SAVE_PATH = "bitfit_molformer_regression.pth"
torch.save(model.state_dict(), BITFIT_MODEL_SAVE_PATH)
print(f"BitFit fine-tuned model saved to {BITFIT_MODEL_SAVE_PATH}")

 # Evaluate the Model on the Test Set

In [None]:
    model.eval()  # Set the model to evaluation mode.
    predictions = []
    actuals = []

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device).float()

            #The fine-tuned model makes predictions on the test batch.
            #The model outputs a single predicted value for each molecule.

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            predictions.extend(outputs.cpu().numpy())
            actuals.extend(labels.cpu().numpy())

    predictions = np.array(predictions)
    actuals = np.array(actuals)
    mse = np.mean((predictions - actuals)**2)  # Compute Mean Squared Error.
    print(f"Test Mean Squared Error (MSE): {mse:.4f}")
    wandb.log({"test_mse": mse})

    # Finish the wandb run.
    wandb.finish()