In [2]:
import transformers
from transformers import pipeline, AutoModel, AutoModelForCausalLM, AutoTokenizer
from transformers import DebertaV2ForMaskedLM

from peft import PeftConfig, PeftModel, AutoPeftModelForCausalLM, LoraConfig, TaskType, PeftModelForCausalLM

from datasets import load_dataset

import numpy as np

import torch

from sklearn.model_selection import KFold, cross_val_predict, GridSearchCV
import time

from sklearn.decomposition import PCA
from sklearn.linear_model import Ridge

from sklearn.metrics import mean_squared_error, r2_score
from scipy.stats import pearsonr
from sklearn.metrics.pairwise import cosine_similarity




#### First of all load the PEFT models and get the sentence embeddings

In [22]:
# BASELINE MODELS

# repo_name = "bert-base-uncased"
# repo_name = "roberta-base"
# repo_name = "microsoft/deberta-base"
# repo_name = "microsoft/deberta-v3-base"
# repo_name = "google/electra-base-generator"
# repo_name = "facebook/bart-base"
# repo_name = "gpt2"

# MODELS OPTIMIZED WITH PEFT LIBRARY, USING LORA WITH CAUSAL LM AND RANK=8

# repo_name = "alitolga/bert-base-uncased-large-peft"
# repo_name = "alitolga/roberta-base-large-peft"
# repo_name = "alitolga/deberta-base-large-peft"
# repo_name = "alitolga/deberta-v3-base-large-peft"
# repo_name = "alitolga/electra-base-generator-large-peft"
# repo_name = "alitolga/bart-base-large-peft"
# repo_name = "alitolga/gpt2-large-peft"

# config = PeftConfig.from_pretrained(repo_name)

# base_model = AutoModelForCausalLM.from_pretrained(config.base_model_name_or_path)
# base_model = DebertaV2ForMaskedLM.from_pretrained(config.base_model_name_or_path)

# tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path)

# model = PeftModel.from_pretrained(base_model, repo_name, config=config)
# model = PeftModel.from_pretrained(base_model, repo_name)

# model = AutoModelForCausalLM.from_pretrained(repo_name)
model = DebertaV2ForMaskedLM.from_pretrained(repo_name)
tokenizer = AutoTokenizer.from_pretrained(repo_name)

# model.print_trainable_parameters()

Some weights of DebertaV2ForMaskedLM were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.bias', 'cls.predictions.transform.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


##### Try inference just for fun

In [None]:
text_generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# Generate text
generated_text = text_generator("Welcome to the Hugging Face course! I am pleased to inform you", 
                                max_length = 50, 
                                # num_return_sequences=2
                                # max_new_tokens = 30
                                )

print(generated_text)

#### Load the Dataset

In [6]:
# Specify the dataset name
dataset_name = "helena-balabin/pereira_fMRI_sentences"

# Specify the path to save or load the dataset
save_path = "./data"

# Load the dataset, use the cache if available
dataset = load_dataset(dataset_name, cache_dir=save_path)

In [7]:
sentences = dataset["train"]["sentences"]
sentences = sentences[0] # 0th subject
print(len(sentences))

384


In [8]:
print(sentences[0])

An accordion is a portable musical instrument with two keyboards.


#### Get the sentence embeddings from the Peft model

In [9]:
def get_embeddings(sentence):
    inputs = tokenizer(sentence, return_tensors='pt', truncation=True, padding=True)
    outputs = model(**inputs, output_hidden_states=True)
    
    hidden_states = outputs.hidden_states

    embeddings = torch.mean(hidden_states[0], dim=1)

    return embeddings

In [10]:
# tokenizer.pad_token = tokenizer.eos_token
# tokenizer.pad_token

In [23]:
embeddings = get_embeddings(sentences)
print(embeddings.shape)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


torch.Size([384, 768])


#### Do the Brain Decoding Part

In [13]:
# Get the voxels. For simplicity we start with all the brain regions
fmri_data = dataset["train"]["all"]

# fMRI data of the first subject out of 8
voxels = np.array(fmri_data[0])
print(voxels.shape)

(384, 195127)


In [14]:
# Normalize the embeddings
# embeddings_normed = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

In [24]:
embeddings = embeddings.detach().numpy()

In [25]:
# Prepare nested CV.
# Inner CV is responsible for hyperparameter optimization;
# outer CV is responsible for prediction.

n_folds = 5

state = int(time.time())
inner_cv = KFold(n_splits=n_folds, shuffle=True, random_state=state)
outer_cv = KFold(n_splits=n_folds, shuffle=True, random_state=state)

# Final data prep: normalize.
X = voxels - voxels.mean(axis=0)
X = X / np.linalg.norm(X, axis=1, keepdims=True)
Y = embeddings - embeddings.mean(axis=0)
Y = Y / np.linalg.norm(Y, axis=1, keepdims=True)

In [17]:
######## Run learning.

n_jobs = 4

# Candidate ridge regression regularization parameters.
ALPHAS = [1, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e1]

# Run inner CV.
gs = GridSearchCV(Ridge(fit_intercept=False),
                {"alpha": ALPHAS}, cv=inner_cv, n_jobs=n_jobs, verbose=10)

"""
Purpose of This Line

Nested Cross-Validation:

The use of cross_val_predict with GridSearchCV (gs in this context) as the estimator 
is a part of a nested cross-validation strategy. 
The key purpose here is to evaluate the model's performance in a way that is as unbiased as possible.

Independent Data Splits:

The outer cross-validation (cv=outer_cv) splits the dataset into training and test sets multiple times 
(based on the number of folds in outer_cv). For each of these splits, 
the inner cross-validation (within GridSearchCV) finds the best alpha value. 

This process ensures that the choice of hyperparameters (alpha in this case) is not biased by the 
particular split of data used for model training and evaluation.

Generating Unbiased Predictions:

cross_val_predict does not simply fit the model but generates predictions for each point 
when it is in the test set of the outer cross-validation. 
These predictions are made by a model that has never seen the data point during training, 
thereby providing an unbiased estimate of the model's performance on unseen data.
"""

"\nPurpose of This Line\n\nNested Cross-Validation:\n\nThe use of cross_val_predict with GridSearchCV (gs in this context) as the estimator \nis a part of a nested cross-validation strategy. \nThe key purpose here is to evaluate the model's performance in a way that is as unbiased as possible.\n\nIndependent Data Splits:\n\nThe outer cross-validation (cv=outer_cv) splits the dataset into training and test sets multiple times \n(based on the number of folds in outer_cv). For each of these splits, \nthe inner cross-validation (within GridSearchCV) finds the best alpha value. \n\nThis process ensures that the choice of hyperparameters (alpha in this case) is not biased by the \nparticular split of data used for model training and evaluation.\n\nGenerating Unbiased Predictions:\n\ncross_val_predict does not simply fit the model but generates predictions for each point \nwhen it is in the test set of the outer cross-validation. \nThese predictions are made by a model that has never seen the

In [26]:
# Run outer CV.
decoder_predictions = cross_val_predict(gs, X, Y, cv=outer_cv)

Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits
Fitting 5 folds for each of 8 candidates, totalling 40 fits


In [17]:
print(decoder_predictions.shape)
print(Y.shape)

(384, 768)
(384, 768)


In [None]:
# def pairwise_accuracy(y_true, y_pred):
#     n = len(y_true)
#     correct_pairs = 0
#     total_pairs = 0

#     for i in range(n):
#         for j in range(i + 1, n):
#             if (y_true[i] > y_true[j]) == (y_pred[i] > y_pred[j]):
#                 correct_pairs += 1
#             total_pairs += 1

#     return correct_pairs / total_pairs if total_pairs > 0 else 0

# # Example usage:
# # y_true = [actual values]
# # y_pred = [predicted values]
# # accuracy = pairwise_accuracy(y_true, y_pred)
# # print("Pairwise Accuracy:", accuracy)

# def pairwise_accuracy_efficient(y_true, y_pred):
#     y_true = np.array(y_true)
#     y_pred = np.array(y_pred)

#     # Create a matrix of differences for true labels
#     diff_true = np.subtract.outer(y_true, y_true) > 0

#     # Create a matrix of differences for predictions
#     diff_pred = np.subtract.outer(y_pred, y_pred) > 0

#     # Count the number of pairs that agree in order
#     correct_pairs = np.sum(diff_true == diff_pred)

#     # Total number of pairs
#     total_pairs = len(y_true) * (len(y_true) - 1) / 2

#     print(correct_pairs)
#     print(total_pairs)

#     return correct_pairs / total_pairs if total_pairs > 0 else 0

In [20]:
from sklearn.base import BaseEstimator, clone
from scipy.spatial.distance import cosine

def pairwise_accuracy(
    estimator: BaseEstimator = None,
    X: torch.Tensor = None,  # noqa
    y: torch.Tensor = None,
    topic_ids: torch.Tensor = None,
    scoring_variation: str = None,  # type: ignore
) -> float:
    """Calculate the average pairwise accuracy of all pairs of true and predicted vectors.

    Based on the pairwise accuracy as defined in Oota et al. 2022, Sun et al. 2021, Pereira et al. 2018.

    :param estimator: Estimator object (e.g., a Ridge regression)
    :type estimator: BaseEstimator
    :param X: Sentence embeddings used as a basis to predict MRI vectors with the estimator
    :type X: torch.Tensor
    :param y: True MRI vectors
    :type y: torch.Tensor
    :param topic_ids: Topic IDs for each paragraph
    :type topic_ids: torch.Tensor
    :param scoring_variation: Variation of the scoring function, defaults to None
    :type scoring_variation: str
    :return: Average pairwise accuracy from all possible sentence pairs
    :rtype: float
    """
    pred = estimator.predict(X)  # noqa

    if scoring_variation == "same-topic":
        # Calculate pairwise accuracy for same-topic sentences
        res = [
            cosine(pred[i], y[i]) + cosine(pred[j], y[j]) < cosine(pred[i], y[j]) + cosine(pred[j], y[i])
            for i in range(len(X))
            for j in range(i + 1, len(X)) if topic_ids[i] == topic_ids[j]
        ]
    elif scoring_variation == "different-topic":
        # Calculate pairwise accuracy for different-topic sentences
        res = [
            cosine(pred[i], y[i]) + cosine(pred[j], y[j]) < cosine(pred[i], y[j]) + cosine(pred[j], y[i])
            for i in range(len(X))
            for j in range(i + 1, len(X)) if topic_ids[i] != topic_ids[j]
        ]
    else:
        # See for all possible sentence pairings: Is the distance between the correct matches of predicted and X
        # sentences smaller than the distance between pairings of X and predicted vectors from different sentences?
        res = [
            cosine(pred[i], y[i]) + cosine(pred[j], y[j]) < cosine(pred[i], y[j]) + cosine(pred[j], y[i])
            for i in range(len(X))
            for j in range(i + 1, len(X))
        ]

    # Return the fraction of instances for which the condition holds versus all possible pairs
    return sum(res) / len(res)


def pearson_scoring(
    estimator: BaseEstimator = None,
    X: torch.Tensor = None,  # noqa
    y: torch.Tensor = None,
) -> float:
    """Calculate the average pearson correlation for the given set of true and predicted MRI vectors.

    :param estimator: Estimator object (e.g., a Ridge regression)
    :type estimator: BaseEstimator
    :param X: Sentence embeddings used as a basis to predict MRI vectors with the estimator
    :type X: torch.Tensor
    :param y: True MRI vectors
    :type y: torch.Tensor
    :return: Average pearson correlation from all pairs of predicted and true MRI vectors
    :rtype: float
    """
    pred = estimator.predict(X)  # noqa

    # See for all possible sentence pairings: Is the distance between the correct matches of predicted and X
    # sentences smaller than the distance between pairings of X and predicted vectors from different sentences?
    res = [pearsonr(t, p).statistic for t, p in zip(y, pred)]

    # Return the fraction of instances for which the condition holds versus all possible pairs
    return np.mean(res)  # noqa

In [None]:
len(X)

In [None]:
Y[0].shape

In [27]:
######### Evaluate.

Y_flatten = Y.flatten()
pred_flatten = decoder_predictions.flatten()

# Evaluate the performance (e.g., using mean squared error)
mse = mean_squared_error(Y, decoder_predictions)
print(f"Mean Squared Error: {mse}")

r2 = r2_score(Y, decoder_predictions)
print(f"R-squared (R2) Score: {r2}")

# Pearson Correlation Coefficient
res = [pearsonr(t, p).statistic for t, p in zip(Y, decoder_predictions)]
pearson_corr = np.mean(res)
print(f"Pearson Correlation Coefficient: {pearson_corr}")

# Cosine Similarity
cosine_sim = np.mean(cosine_similarity(decoder_predictions, Y))
print(f"Cosine Similarity: {cosine_sim}")

# Pairwise Accuracy
res = [ cosine(decoder_predictions[i], Y[i]) + cosine(decoder_predictions[j], Y[j]) < cosine(decoder_predictions[i], Y[j]) + cosine(decoder_predictions[j], Y[i])
        for i in range(len(X))
        for j in range(i + 1, len(X))
    ]
pairwise_acc = sum(res) / len(res)
print(f"Pairwise Accuracy: {pairwise_acc}")

Mean Squared Error: 0.0012917273580361974
R-squared (R2) Score: 0.007150983612804379
Pearson Correlation Coefficient: 0.09380054628315744
Cosine Similarity: -3.174795188503638e-05
Pairwise Accuracy: 0.7945088120104439
