In [None]:
# !pip install requirements.txt

In [1]:
import torch
import time
import os
import random

import pandas as pd
import numpy as np
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from datasets import load_dataset
import datasets
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.utils.data import DataLoader, Dataset
from transformers import (
    AutoTokenizer,
    AutoModel,
    TrainingArguments,
    Trainer,
    AutoModelForMaskedLM,
    DataCollatorForLanguageModeling,
)
from scipy.stats import pearsonr, spearmanr

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DATASET_PATH = "scikit-fingerprints/MoleculeNet_Lipophilicity"
MODEL_NAME = "ibm/MoLFormer-XL-both-10pct"
SAVED_MODEL_PATH = "./models/molformer_finetuned"
EXTERNAL_DATA_PATH = "./data/External-Dataset_for_Task2.csv"

In [3]:
class SMILESDataset(Dataset):
    def __init__(self, strings, labels):
        self.strings = strings
        self.labels = labels

    def __len__(self):
        return len(self.strings)

    def __getitem__(self, idx):
        string = self.strings[idx]
        target = self.labels[idx]
        return [string, target]

In [None]:
class MoLFormerWithRegressionHead(nn.Module):
    def __init__(self, model, hidden_size=768):
        super(MoLFormerWithRegressionHead, self).__init__()
        self.encoder = model

        # regression head (fully connected layer)
        self.regressor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, 1),
        )

    def forward(self, inputs):
        outputs = self.encoder(**inputs)
        model_representation = outputs.pooler_output
        regression_output = self.regressor(model_representation)
        return regression_output

In [5]:
def get_model_from_task1():
    seed = 1

    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    dataset = load_dataset(DATASET_PATH)["train"][:]
    lipophilicity_df = pd.DataFrame(dataset)

    lipophilicity_strings = lipophilicity_df["SMILES"].values
    lipophilicity_targets = lipophilicity_df["label"].values

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)

    scaler = MinMaxScaler()
    lipophilicity_targets = scaler.fit_transform(lipophilicity_targets.reshape(-1, 1))
    X_train, X_test, y_train, y_test = train_test_split(
        lipophilicity_strings, lipophilicity_targets, test_size=0.2, random_state=seed
    )

    BATCH_SIZE = 16

    train_dataset = SMILESDataset(X_train, y_train)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)

    model = AutoModelForMaskedLM.from_pretrained(
        MODEL_NAME, deterministic_eval=True, trust_remote_code=True
    )
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    training_data = []
    for index, data in enumerate(train_dataloader):
        smile_strings = list(data[0])
        training_data.extend(smile_strings)

    dataset = datasets.Dataset.from_list([{"smiles": s} for s in training_data])

    def tokenize_function(examples):
        tokenized = tokenizer(
            examples["smiles"], padding="max_length", truncation=True, max_length=64
        )
        return tokenized

    tokenized_dataset = dataset.map(tokenize_function, batched=True)

    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer, mlm=True, mlm_probability=0.15
    )

    training_args = TrainingArguments(
        output_dir=SAVED_MODEL_PATH,
        evaluation_strategy="no",
        save_strategy="epoch",
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        weight_decay=0.01,
        num_train_epochs=3,
        logging_dir="./logs",
        logging_steps=500,
        report_to="none",
        # no_cuda=True
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        tokenizer=tokenizer,
        data_collator=data_collator,
    )

    trainer.train()

    model.save_pretrained(SAVED_MODEL_PATH)

In [None]:
def visualize_tuning_results(
    techniques, csv_filename, plots_folder, graph_type="Tuning"
):
    # Create directory for saving plots
    os.makedirs(plots_folder, exist_ok=True)

    # Convert dictionary to DataFrame
    df = pd.DataFrame.from_dict(techniques, orient="index")
    df.reset_index(inplace=True)
    df.rename(columns={"index": "Tuning Method"}, inplace=True)

    # Save DataFrame to CSV
    df.to_csv(csv_filename, index=False)

    # Drop 'training_time' column for metric comparison
    metrics_df = df.drop(columns=["training_time"]).set_index("Tuning Method")

    # Create a grouped bar plot
    plt.figure(figsize=(12, 6))
    bar_width = 0.2  # Width of each bar
    x = np.arange(len(metrics_df.columns))  # X locations for the metrics

    # Plot bars for each tuning method
    for i, (method, values) in enumerate(metrics_df.iterrows()):
        plt.bar(x + i * bar_width, values, width=bar_width, label=method)

    # Formatting the plot
    plt.xlabel("Metrics")
    plt.ylabel("Values")
    plt.title(f"Comparison of Different Metrics for {graph_type} Techniques")
    plt.xticks(
        x + bar_width * (len(metrics_df.index) / 2), metrics_df.columns, rotation=45
    )  # Center x-ticks
    plt.legend(title="Tuning Methods")
    plt.grid(axis="y", linestyle="--", alpha=0.7)

    # Save and display plot
    plot_path = os.path.join(plots_folder, f"all_metrics_comparison.png")
    plt.savefig(plot_path, bbox_inches="tight")
    plt.show()

In [None]:
def diversity_based_selection(dataset, embeddings, num_samples=200, n_clusters=10):
    kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(embeddings)
    cluster_labels = kmeans.labels_

    selected_indices = []
    for cluster_id in range(n_clusters):
        cluster_points = np.where(cluster_labels == cluster_id)[0]
        if len(cluster_points) > 0:
            selected_indices.extend(
                np.random.choice(
                    cluster_points,
                    min(num_samples // n_clusters, len(cluster_points)),
                    replace=False,
                )
            )

    selected_indices = selected_indices[:num_samples]
    # Extract SMILES and labels for the selected indices
    selected_smiles = [dataset["SMILES"][i] for i in selected_indices]
    selected_labels = [dataset["label"][i] for i in selected_indices]

    return selected_smiles, selected_labels

In [53]:
def pre_processing(dataset):
    lipophilicity_strings = np.array(dataset["SMILES"])
    lipophilicity_targets = np.array(dataset["label"])
    scaler = MinMaxScaler()
    lipophilicity_targets = scaler.fit_transform(lipophilicity_targets.reshape(-1, 1))

    X_train, X_test, y_train, y_test = train_test_split(
        lipophilicity_strings, lipophilicity_targets, test_size=0.2, random_state=42
    )

    return X_train, X_test, y_train, y_test, scaler

In [27]:
def prepare_dataloaders(dataset):
    X_train, X_test, y_train, y_test, scaler = pre_processing(dataset)
    BATCH_SIZE = 32
    train_dataset = SMILESDataset(X_train, y_train)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

    test_dataset = SMILESDataset(X_test, y_test)
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)
    return train_dataloader, test_dataloader, scaler

In [28]:
@torch.no_grad()
def generate_embeddings(dataset, model, tokenizer, device):
    model.eval()
    embeddings = []

    for batch in DataLoader(dataset, batch_size=32, shuffle=False):
        smiles, _ = batch  # Extract SMILES strings
        inputs = tokenizer(smiles, padding=True, return_tensors="pt").to(device)

        outputs = model(
            **inputs
        )  # Assuming this returns a BaseModelOutputWithPooling object

        # Extract the last hidden state or pooled output
        if hasattr(outputs, "last_hidden_state"):
            embedding_tensor = outputs.last_hidden_state[
                :, 0, :
            ]  # CLS token representation
        elif hasattr(outputs, "pooler_output"):
            embedding_tensor = outputs.pooler_output  # Pooled output
        else:
            raise ValueError(
                "Unexpected model output format. Check if your model provides embeddings."
            )

        embeddings.append(embedding_tensor.cpu().numpy())

    return np.vstack(embeddings)

In [29]:
def get_diversity_based_dataset(external_dataset, dataset, model, tokenizer, device):
    embeddings = generate_embeddings(
        SMILESDataset(external_dataset["SMILES"], external_dataset["label"]),
        model,
        tokenizer,
        device,
    )
    selected_smiles, selected_labels = diversity_based_selection(
        external_dataset, embeddings, num_samples=100, n_clusters=5
    )
    selected_dataset = {"SMILES": selected_smiles, "label": selected_labels}
    combined_data = {
        key: (selected_dataset.get(key, []) + dataset.get(key, []))
        for key in set(selected_dataset) | set(dataset)
    }
    return combined_data

In [30]:
def stratified_sampling(df, num_samples=100):
    stratified_split = StratifiedShuffleSplit(
        n_splits=1, test_size=num_samples / len(df), random_state=42
    )
    for train_idx, sample_idx in stratified_split.split(df, df["logD_bin"]):
        stratified_sample = df.iloc[sample_idx]
    stratified_sample = stratified_sample.to_dict(orient="list")
    stratified_sample.pop("logD_bin", None)
    return stratified_sample

In [31]:
def get_stratified_dataset(external_dataset, dataset):
    df_train = pd.DataFrame(external_dataset)
    num_bins = 3  # Low, Medium, High
    df_train["logD_bin"] = pd.qcut(df_train["label"], q=num_bins, labels=False)
    stratified_data = stratified_sampling(df_train)
    combined_data = {
        key: (stratified_data.get(key, []) + dataset.get(key, []))
        for key in set(stratified_data) | set(dataset)
    }
    return combined_data

In [32]:
def train_model(model, train_dataloader, tokenizer, device):
    epochs = 6
    learning_rate = 0.0001

    mse_loss = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    for epoch in range(epochs):
        for index, data in enumerate(train_dataloader):
            smile_strings = data[0]
            smile_targets = data[1].to(device).float()

            inputs = tokenizer(smile_strings, padding=True, return_tensors="pt").to(
                device
            )
            outputs = model(inputs)

            loss = mse_loss(outputs, smile_targets)

            # if index % 10 == 0:
            #     print(f"Epoch {epoch+1} , Iteration {index+1} : {loss.item()}")

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    return model

In [33]:
def evaluate_model(model, test_dataloader, scaler, tokenizer, device):
    model.eval()
    predictions = []
    actuals = []

    with torch.no_grad():
        for data in test_dataloader:
            smile_strings = data[0]
            smile_targets = data[1].to(device).float()

            inputs = tokenizer(smile_strings, padding=True, return_tensors="pt").to(
                device
            )
            outputs = model(inputs)  # Flatten output to match targets

            predictions.extend(outputs.cpu().numpy())
            actuals.extend(smile_targets.cpu().numpy())

    # Convert lists to numpy arrays
    predictions = np.array(predictions).flatten()  # Ensure 1D shape
    actuals = np.array(actuals).flatten()

    # Ensure correct dtype
    predictions = predictions.astype(np.float64)
    actuals = actuals.astype(np.float64)

    predictions = predictions.reshape(-1, 1)
    actuals = actuals.reshape(-1, 1)
    # Scale back
    predictions = np.array(scaler.inverse_transform(predictions)).flatten().tolist()
    actuals = np.array(scaler.inverse_transform(actuals)).flatten().tolist()

    # Calculate metrics
    mse = mean_squared_error(actuals, predictions)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(actuals, predictions)
    r2 = r2_score(actuals, predictions)
    pearson_corr, _ = pearsonr(actuals, predictions)
    spearman_corr, _ = spearmanr(actuals, predictions)

    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"R-squared (R²): {r2:.4f}")
    print(f"Pearson Correlation: {pearson_corr:.4f}")
    print(f"Spearman Correlation: {spearman_corr:.4f}")
    return mse, rmse, mae, r2, pearson_corr, spearman_corr

In [56]:
def task3a():

    dataset = load_dataset("scikit-fingerprints/MoleculeNet_Lipophilicity")["train"][:]
    external_dataset = pd.read_csv(EXTERNAL_DATA_PATH).to_dict("list")
    external_dataset["label"] = external_dataset.pop("Label")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
    if not os.path.exists(SAVED_MODEL_PATH):
        get_model_from_task1()
    model = AutoModel.from_pretrained(
        SAVED_MODEL_PATH, deterministic_eval=True, trust_remote_code=True
    )

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    processed_datasets = {
        "stratified_dataset": get_stratified_dataset(external_dataset, dataset),
        "diversity_based_dataset": get_diversity_based_dataset(
            external_dataset, dataset, model, tokenizer, device
        ),
    }

    dataloaders = {}
    for key, value in processed_datasets.items():
        train_loader, test_loader, scaler = prepare_dataloaders(value)
        dataloaders[key] = {
            "train_loader": train_loader,
            "test_loader": test_loader,
            "scaler": scaler,
        }

    techniques = {
        "stratified_dataset": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
        "diversity_based_dataset": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
    }

    for key, value in dataloaders.items():
        model = AutoModel.from_pretrained(
            MODEL_NAME, deterministic_eval=True, trust_remote_code=True
        )
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = MoLFormerWithRegressionHead(model).to(device)

        start_time = time.time()
        model = train_model(model, value["train_loader"], tokenizer, device)
        end_time = time.time()
        training_time = end_time - start_time

        mse, rmse, mae, r2, pearson_corr, spearman_corr = evaluate_model(
            model, value["test_loader"], value["scaler"], tokenizer, device
        )

        techniques[key]["MSE"] = mse
        techniques[key]["RMSE"] = rmse
        techniques[key]["MAE"] = mae
        techniques[key]["R2"] = r2
        techniques[key]["pearson_corr"] = pearson_corr
        techniques[key]["spearman_corr"] = spearman_corr
        techniques[key]["training_time"] = training_time

    visualize_tuning_results(
        techniques,
        csv_filename="task3a.csv",
        plots_folder="plots3a",
        graph_type="Data Selection",
    )

In [None]:
task3a()

In [37]:
class lora_matrix(nn.Module):
    def __init__(self, input_dimension, output_dimension, rank, alpha):
        super().__init__()
        self.lora_matrix_A = torch.nn.Parameter(
            torch.randn(input_dimension, rank) * 0.01
        )
        self.lora_matrix_B = torch.nn.Parameter(
            torch.randn(rank, output_dimension) * 0.01
        )
        self.alpha = alpha

    def forward(self, x):
        lora_matrix = x @ self.lora_matrix_A @ self.lora_matrix_B
        return self.alpha * lora_matrix

In [38]:
class linear_lora(nn.Module):
    def __init__(self, layer, rank, alpha):
        super().__init__()
        self.layer = layer
        input_dimension = layer.in_features
        output_dimension = layer.out_features

        self.lora = lora_matrix(input_dimension, output_dimension, rank, alpha)

    def forward(self, x):
        return self.layer(x) + self.lora(x)

In [39]:
class IA3MolformerSelfAttention(nn.Module):
    def __init__(self, original_attention):
        super().__init__()
        self.original_attention = original_attention  # Store original layer

        self.query = original_attention.query
        self.key = original_attention.key
        self.value = original_attention.value
        self.rotary_embeddings = original_attention.rotary_embeddings
        self.feature_map = original_attention.feature_map

        self.lk = nn.Parameter(torch.ones(original_attention.key.out_features))
        self.lv = nn.Parameter(torch.ones(original_attention.value.out_features))

    def forward(self, hidden_states, *args, **kwargs):
        hidden_states = hidden_states.clone()
        query_layer = self.query(hidden_states)

        hidden_states = hidden_states * self.lk
        hidden_states = hidden_states * self.lv
        return self.original_attention.forward(hidden_states, *args, **kwargs)

In [40]:
class IA3MolformerIntermediate(nn.Module):
    def __init__(self, original_intermediate):
        super().__init__()
        self.dense = original_intermediate.dense
        self.intermediate_act_fn = original_intermediate.intermediate_act_fn
        self.lff = nn.Parameter(torch.ones(original_intermediate.dense.out_features))

    def forward(self, hidden_states):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.intermediate_act_fn(hidden_states)
        hidden_states = hidden_states * self.lff
        return hidden_states

In [41]:
def print_trainable_parameters(model):
    trainable_params = 0
    all_param = 0
    for _, param in model.named_parameters():
        all_param += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(
        f"Trainable Parameters: {trainable_params} || All Parameters: {all_param} || Trainable Parameters (%): {100 * trainable_params / all_param:.8f}"
    )

In [42]:
def bitfit(model):
    for name, param in model.named_parameters():
        if "weight" in name:
            param.requires_grad = False
        elif "bias" in name:
            param.requires_grad = True
    return model

In [43]:
def lora(model):
    rank = 8
    alpha = 16

    for param in model.parameters():
        param.requires_grad = False

    for layer in model.encoder.layer:
        layer.attention.self.query = linear_lora(
            layer.attention.self.query, rank=rank, alpha=alpha
        )
        layer.attention.self.key = linear_lora(
            layer.attention.self.key, rank=rank, alpha=alpha
        )
        layer.attention.self.value = linear_lora(
            layer.attention.self.value, rank=rank, alpha=alpha
        )
        layer.attention.output.dense = linear_lora(
            layer.attention.output.dense, rank=rank, alpha=alpha
        )
    for name, param in model.named_parameters():
        if "lora_matrix" in name:
            param.requires_grad = True
    return model

In [44]:
def ia3(model):
    for num, layer in enumerate(model.encoder.layer):
        layer.attention.self = IA3MolformerSelfAttention(layer.attention.self)
        layer.intermediate = IA3MolformerIntermediate(layer.intermediate)

    for param in model.parameters():
        param.requires_grad = False

    for name, param in model.named_parameters():
        if "ia3" in name:
            param.requires_grad = True

    return model

In [None]:
def task3b():
    dataset = load_dataset("scikit-fingerprints/MoleculeNet_Lipophilicity")["train"][:]
    external_dataset = pd.read_csv(EXTERNAL_DATA_PATH).to_dict("list")
    external_dataset["label"] = external_dataset.pop("Label")
    combined_data = {
        key: (external_dataset.get(key, []) + dataset.get(key, []))
        for key in set(external_dataset) | set(dataset)
    }

    MODEL_NAME = "ibm/MoLFormer-XL-both-10pct"
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
    print(combined_data)
    train_loader, test_loader, scaler = prepare_dataloaders(combined_data)

    techniques = {
        "fine_tuning": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
        "BitFit": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
        "LoRA": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
        "iA3": {
            "MSE": 0,
            "RMSE": 0,
            "MAE": 0,
            "R2": 0,
            "pearson_corr": 0,
            "spearman_corr": 0,
            "training_time": 0,
        },
    }

    for technique in techniques:
        model = AutoModel.from_pretrained(
            MODEL_NAME, deterministic_eval=True, trust_remote_code=True
        )
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        if technique == "BitFit":
            model = bitfit(model)
        elif technique == "LoRA":
            model = lora(model)
        elif technique == "iA3":
            model = ia3(model)

        print(technique)
        model = MoLFormerWithRegressionHead(model)

        print_trainable_parameters(model)
        model = model.to(device)

        start_time = time.time()
        model = train_model(model, train_loader, tokenizer, device)
        end_time = time.time()
        training_time = end_time - start_time

        mse, rmse, mae, r2, pearson_corr, spearman_corr = evaluate_model(
            model, test_loader, scaler, tokenizer, device
        )

        techniques[technique]["MSE"] = mse
        techniques[technique]["RMSE"] = rmse
        techniques[technique]["MAE"] = mae
        techniques[technique]["R2"] = r2
        techniques[technique]["pearson_corr"] = pearson_corr
        techniques[technique]["spearman_corr"] = spearman_corr
        techniques[technique]["training_time"] = training_time

    visualize_tuning_results(
        techniques, csv_filename="task3b.csv", plots_folder="plots3b"
    )

In [None]:
task3b()