Данный ноутбук содержит подбор оптимальных весов в комбинации эмбеддингов с использованием optuna.

В ячейке ниже нужно указать название задачи mteb, а также название BERT-like модели в библиотеке transformers. По умолчанию оптимизация проводится на 100 итерациях, поэтому может занимать довольно много времени.

In [None]:
task_name = "STSBenchmark" # your mteb task name
model_name = "bert-base-uncased" # your model name
layers_to_use = list(range(0, 12)) # layers to use in the combination

In [None]:
from IPython.display import clear_output

!pip install mteb
!pip install optuna
clear_output()

In [8]:
import mteb
test_task = mteb.get_task(task_name, eval_splits=["test"], languages=['eng'])
train_task = mteb.get_task(task_name, eval_splits=["train"], languages=['eng'])
evaluation = mteb.MTEB(tasks=[train_task])
evaluation_final = mteb.MTEB(tasks=[test_task])

In [9]:
from mteb.encoder_interface import PromptType
import numpy as np
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel
import torch
import pandas as pd
import optuna

class CustomModel:
    def __init__(self, model_name="bert-base-uncased", device="cuda" if torch.cuda.is_available() else "cpu", layers_to_use=[-1], layers_weights=[1.0], batch_size=256):
        self.model_name = model_name
        self.device = device
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModel.from_pretrained(self.model_name, output_hidden_states=True).to(self.device)
        self.layers_to_use = layers_to_use
        self.layers_weights = layers_weights
        self.batch_size = batch_size

    def get_layer_embedding(self, batch: list[str], layers: list[int], weights: list[float]) -> np.ndarray:
        encoded_inputs = self.tokenizer(
            batch, padding=True, truncation=True, return_tensors="pt"
        ).to(self.device)
        model_output = self.model(**encoded_inputs)
        hidden_states = model_output.hidden_states[1:]
        layers_output = [hidden_states[i] for i in layers]

        pooled_layers = []
        for i, layer_output in enumerate(layers_output):
            input_mask_expanded = encoded_inputs['attention_mask'].unsqueeze(-1).expand(layer_output.size()).float()
            sum_embeddings = torch.sum(layer_output * input_mask_expanded, 1)
            sum_mask = input_mask_expanded.sum(1)
            mean_pooled = sum_embeddings / sum_mask
            pooled_layers.append(weights[i] * mean_pooled)

        return torch.sum(torch.stack(pooled_layers), dim=0)

    def encode(
        self,
        sentences: list[str],
        task_name: str,
        prompt_type: PromptType | None = None,
        **kwargs,
    ) -> np.ndarray:
        """Encodes the given sentences using the encoder.

        Args:
            sentences: The sentences to encode.
            task_name: The name of the task.  (Not directly used in this example, but kept for MTEB compatibility)
            prompt_type: The prompt type to use. (Not directly used in this example, but kept for MTEB compatibility)
            **kwargs: Additional arguments to pass to the encoder.

        Returns:
            The encoded sentences as a numpy array.
        """
        self.model.eval()
        with torch.no_grad():
            all_embeddings = []
            for i in range(0, len(sentences), self.batch_size):
                batch = sentences[i:i + self.batch_size]
                combination = self.get_layer_embedding(batch, self.layers_to_use, self.layers_weights)
                combination = combination.cpu().numpy()
                all_embeddings.append(combination)
        return np.concatenate(all_embeddings, axis=0)

In [10]:
counter = 0
best_params = {}
def run_experiment_all(layers):
    global counter
    global model_name
    layers_to_use = layers
    def objective(trial):
        global counter
        global model_name
        w_1 = trial.suggest_float('w_1', 0, 1)
        w_2 = trial.suggest_float('w_2', 0, 1)
        w_3 = trial.suggest_float('w_3', 0, 1)
        w_4 = trial.suggest_float('w_4', 0, 1)
        w_5 = trial.suggest_float('w_5', 0, 1)
        w_6 = trial.suggest_float('w_6', 0, 1)
        w_7 = trial.suggest_float('w_7', 0, 1)
        w_8 = trial.suggest_float('w_8', 0, 1)
        w_9 = trial.suggest_float('w_9', 0, 1)
        w_10 = trial.suggest_float('w_10', 0, 1)
        w_11 = trial.suggest_float('w_11', 0, 1)
        w_12 = trial.suggest_float('w_12', 0, 1)
        weights = np.array([w_1, w_2, w_3, w_4, w_5, w_6, w_7, w_8, w_9, w_10, w_11, w_12])
        normalized_weights =  weights/np.sum(weights)
        model = CustomModel(model_name=model_name, layers_to_use=layers_to_use, layers_weights=normalized_weights)
        quality = evaluation.run(model, output_folder=f"results/{model_name}/{layers_to_use}/{counter}")
        counter += 1
        return quality[0].scores['train'][0]['main_score']
        
    study = optuna.create_study(
        directions=["maximize"],
    )
    study.optimize(objective, n_trials=100)
    print("Number of finished trials: ", len(study.trials))
    trials = sorted(study.best_trials, key=lambda t: t.values)
    
    for trial in trials:
        print("  Trial#{}".format(trial.number))
        print(
            "    Values: Values={}".format( 
                trial.values
            )
        )
        print("    Params: {}".format(trial.params))
        best_params = trial.params

In [None]:
run_experiment_all(layers_to_use)