# WOA + TabuSearch

In [None]:
import gc
import os
from math import exp
from collections import Counter
from typing import List, Optional, Union

import numpy as np
import pandas as pd
import transformers
import torch

from collections import deque
from typing import Tuple, List
from collections import deque
import random

## GEMMA

In [None]:
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
PAD_TOKEN_LABEL_ID = torch.nn.CrossEntropyLoss().ignore_index
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class ParticipantVisibleError(Exception):
    pass

def score(
    solution: pd.DataFrame,
    submission: pd.DataFrame,
    row_id_column_name: str,
    model_path: str = '/kaggle/input/gemma-2/transformers/gemma-2-9b/2',
    load_in_8bit: bool = False,
    clear_mem: bool = False,
) -> float:
    # Check that each submitted string is a permutation of the solution string
    sol_counts = solution.loc[:, 'text'].str.split().apply(Counter)
    sub_counts = submission.loc[:, 'text'].str.split().apply(Counter)
    invalid_mask = sol_counts != sub_counts
    if invalid_mask.any():
        raise ParticipantVisibleError(
            'At least one submitted string is not a valid permutation of the solution string.'
        )

    # Calculate perplexity for the submitted strings
    sub_strings = [
        ' '.join(s.split()) for s in submission['text'].tolist()
    ]  # Split and rejoin to normalize whitespace
    scorer = PerplexityCalculator(
        model_path=model_path,
        load_in_8bit=load_in_8bit,
    )  # Initialize the perplexity calculator with a pre-trained model
    perplexities = scorer.get_perplexity(
        sub_strings
    )  # Calculate perplexity for each submitted string

    if clear_mem:
        # Just move on if it fails. Not essential if we have the score.
        try:
            scorer.clear_gpu_memory()
        except:
            print('GPU memory clearing failed.')

    return float(np.mean(perplexities))

class PerplexityCalculator:
    def __init__(
        self,
        model_path: str,
        load_in_8bit: bool = False,
        device_map: str = 'auto',
    ):
        self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_path)
        # Configure model loading based on quantization setting and device availability
        if load_in_8bit:
            if DEVICE.type != 'cuda':
                raise ValueError('8-bit quantization requires CUDA device')
            quantization_config = transformers.BitsAndBytesConfig(load_in_8bit=True)
            self.model = transformers.AutoModelForCausalLM.from_pretrained(
                model_path,
                quantization_config=quantization_config,
                device_map=device_map,
            )
        else:
            self.model = transformers.AutoModelForCausalLM.from_pretrained(
                model_path,
                torch_dtype=torch.float16 if DEVICE.type == 'cuda' else torch.float32,
                device_map=device_map,
            )

        self.loss_fct = torch.nn.CrossEntropyLoss(reduction='none')

        self.model.eval()

    def get_perplexity(
        self, input_texts: Union[str, List[str]], debug=False
    ) -> Union[float, List[float]]:
        single_input = isinstance(input_texts, str)
        input_texts = [input_texts] if single_input else input_texts

        loss_list = []
        with torch.no_grad():
            # Process each sequence independently
            for text in input_texts:
                # Explicitly add sequence boundary tokens to the text
                text_with_special = f"{self.tokenizer.bos_token}{text}{self.tokenizer.eos_token}"

                # Tokenize
                model_inputs = self.tokenizer(
                    text_with_special,
                    return_tensors='pt',
                    add_special_tokens=False,
                )

                if 'token_type_ids' in model_inputs:
                    model_inputs.pop('token_type_ids')

                model_inputs = {k: v.to(DEVICE) for k, v in model_inputs.items()}

                # Get model output
                output = self.model(**model_inputs, use_cache=False)
                logits = output['logits']

                # Shift logits and labels for calculating loss
                shift_logits = logits[..., :-1, :].contiguous()  # Drop last prediction
                shift_labels = model_inputs['input_ids'][..., 1:].contiguous()  # Drop first input

                # Calculate token-wise loss
                loss = self.loss_fct(
                    shift_logits.view(-1, shift_logits.size(-1)),
                    shift_labels.view(-1)
                )

                # Calculate average loss
                sequence_loss = loss.sum() / len(loss)
                loss_list.append(sequence_loss.cpu().item())

                # Debug output
                if debug:
                    print(f"\nProcessing: '{text}'")
                    print(f"With special tokens: '{text_with_special}'")
                    print(f"Input tokens: {model_inputs['input_ids'][0].tolist()}")
                    print(f"Target tokens: {shift_labels[0].tolist()}")
                    print(f"Input decoded: {self.tokenizer.decode(model_inputs['input_ids'][0])}")
                    print(f"Target decoded: {self.tokenizer.decode(shift_labels[0])}")
                    print(f"Individual losses: {loss.tolist()}")
                    print(f"Average loss: {sequence_loss.item():.4f}")

        ppl = [exp(i) for i in loss_list]

        if debug:
            print("\nFinal perplexities:")
            for text, perp in zip(input_texts, ppl):
                print(f"Text: '{text}'")
                print(f"Perplexity: {perp:.2f}")

        return ppl[0] if single_input else ppl

    def clear_gpu_memory(self) -> None:
        if not torch.cuda.is_available():
            return

        # Delete model and tokenizer if they exist
        if hasattr(self, 'model'):
            del self.model
        if hasattr(self, 'tokenizer'):
            del self.tokenizer

        # Run garbage collection
        gc.collect()

        # Clear CUDA cache and reset memory stats
        with DEVICE:
            torch.cuda.empty_cache()
            torch.cuda.ipc_collect()
            torch.cuda.reset_peak_memory_stats()

---
# CACHING

In [None]:
class Caching:
    def __init__(self, max_size: int = 1000):
        self.stores: List[dict] = []
        self.idx = -1
        self.max_size = max_size
        self._state = {'len': 0, 'hit': 0, 'miss': 0, 'total': 0}

    def create_store(self):
        self.stores.append({})
        self.idx += 1
        self._state['len'] += 1

    def add(self, key, value):
        if self.idx == -1 or len(self.stores[self.idx]) >= self.max_size:
            self.create_store()
        self.stores[self.idx][key] = value

    def get(self, key):
        if self.idx == -1:
            return None
        self._state['total'] += 1
        store_idx = random.choices(range(self.idx+1), k=self.idx+1)
        for i in store_idx:
            v = self.stores[i].get(key, None)
            if v is not None:
                self._state['hit'] += 1
                return v
        self._state['miss'] += 1
        return v

    def state(self):
        return f"total: {self._state['total']}, len: {self._state['len']}, stores: {len(self.stores)} "\
              f"hit: {self._state['hit']} [{self._state['hit']/self._state['total']*100}%], "\
              f"miss: {self._state['miss']} [{self._state['miss']/self._state['total']*100}%]"


---
# TabuSearch

In [None]:
class TabuSearch:
    '''
    Algorithme de recherche tabou

    Cette classe implémente l'algorithme d'optimisation de recherche tabou, utilisé pour résoudre
    des problèmes d'optimisation combinatoire. Il améliore itérativement les solutions en explorant
    des candidats voisins et en utilisant une liste tabou pour éviter de revisiter des solutions récemment explorées.

    Paramètres :
        words (list[str]): La séquence de mots à optimiser.
        calculate_perplexity (callable): Une méthode pour calculer la perplexité d'une phrase.
        tabu_tenure (int, optionnel): La taille de la liste tabou. Par défaut, 5.
        stop_on_non_improvement (int, optionnel): Le nombre d'itérations autorisées sans amélioration avant d'arrêter. Par défaut, 20.
        max_iter (int, optionnel): Le nombre maximal d'itérations à exécuter. Par défaut, 100.
        max_candidates (int, optionnel): Le nombre maximal de solutions candidates à générer par itération. Par défaut, 100.
        debug (bool, optionnel): Si True, active la journalisation de débogage. Par défaut, False.
        logger (callable, optionnel): Une fonction de journalisation. Si None, imprime dans la console. Par défaut, None.
    '''

    def __init__(self, words: list, calculate_perplexity, tabu_tenure=5,
                 stop_on_non_improvement=20, max_iter=100,
                 max_condidates=100,
                 debug=False, logger = None):
        '''
        Initialise l'instance de recherche tabou.

        Paramètres :
            words (list[str]): La séquence de mots à optimiser.
            calculate_perplexity (callable): Une méthode pour calculer la perplexité d'une phrase.
            tabu_tenure (int, optionnel): La taille de la liste tabou. Par défaut, 5.
            stop_on_non_improvement (int, optionnel): Le nombre d'itérations autorisées sans amélioration. Par défaut, 20.
            max_iter (int, optionnel): Le nombre maximal d'itérations. Par défaut, 100.
            max_condidates (int, optionnel): Le nombre maximal de solutions candidates par itération. Par défaut, 100.
            debug (bool, optionnel): Si True, active la journalisation de débogage. Par défaut, False.
            logger (callable, optionnel): Une fonction de journalisation. Par défaut, None.
        '''
        self.words = words  # Liste des mots à optimiser
        self.n = len(self.words)  # Nombre total de mots
        self.tabu_tenure = tabu_tenure  # Taille de la liste tabou
        self.stop_on_non_improvement = stop_on_non_improvement  # Condition d'arrêt en cas de non-amélioration
        self.max_iter = max_iter  # Maximum d'itérations
        self.calculate_perplexity = calculate_perplexity  # Méthode pour calculer la perplexité
        self.tabu_list = deque(maxlen=self.tabu_tenure)  # Liste tabou pour suivre les mouvements récents
        self.max_condidates = max_condidates  # Nombre maximal de solutions candidates à générer
        self.debug = debug  # Indicateur pour activer le débogage
        self.logger = logger  # Fonction de journalisation

    def log(self, msg):
        '''
        Journalise un message si le débogage est activé.

        Paramètres :
            msg (str): Le message à journaliser.
        '''
        if self.debug:  # Vérifie si le débogage est activé
            if self.logger is None:  # Si aucun logger fourni
                print(f"[LOCAL SEARCH] {msg}")  # Imprime le message
            else:
                self.logger(f"[LOCAL SEARCH] {msg}")  # Utilise le logger fourni

    def is_move_used(self, move: Tuple[int, int]):
        '''
        Vérifie si un mouvement est présent dans la liste tabou.
        '''
        return move in self.tabu_list  # Vérifie si le mouvement est dans la liste tabou

    def __to_sentence_text(self, solution):
        '''
        Convertit une solution en phrase en mappant les indices aux mots.

          '''
        return " ".join([self.words[i] for i in solution])  # Retourne la phrase correspondante

    def to_tabulist(self, move):
        '''
        Ajoute un mouvement à la liste tabou.

        Paramètres :
            move (Tuple[int, int]): Le mouvement à ajouter à la liste tabou.
        '''
        self.tabu_list.append(tuple(move))  # Ajoute le mouvement à la liste tabou

    def get_candidates(self, solution: np.ndarray) -> List[Tuple[list, Tuple[int, int]]]:
        '''
        Génère des solutions candidates en échangeant aléatoirement deux éléments.

        Paramètres :
            solution (np.ndarray): La solution actuelle sous forme d'un tableau d'indices.

        Retourne :
            list[Tuple[list, Tuple[int, int]]]: Une liste de solutions candidates et leurs mouvements correspondants.
        '''
        candidates = []  # Liste pour stocker les candidats

        while len(candidates) < self.max_condidates:  # Tant que le nombre de candidats est inférieur au maximum
            i, j = random.choices(solution, k=2)  # Sélectionne deux indices aléatoires
            candidate = solution.copy()  # Copie de la solution actuelle
            candidate[i], candidate[j] = candidate[j], candidate[i]  # Échange les deux éléments
            candidates.append((candidate, (i, j)))  # Ajoute le candidat et le mouvement à la liste
        return candidates  # Retourne la liste des candidats

    def optimize(self, current_sentence: np.ndarray) -> Tuple[np.ndarray, float]:
        '''
        Exécute l'algorithme de recherche tabou.

        Paramètres :
            current_sentence (np.ndarray): La solution initiale sous forme d'un tableau d'indices.

        Retourne :
            Tuple[np.ndarray, float]: La meilleure solution et sa perplexité correspondante.
        '''
        self.log(f"start {current_sentence}")  # Journalise le début de l'optimisation

        # Calcule la perplexité de la solution initiale
        current_perplexity = self.calculate_perplexity(current_sentence)

        # Initialise la meilleure solution et sa perplexité
        best_sentence = current_sentence
        best_perplexity = current_perplexity

        # Suivre le nombre d'itérations sans amélioration et le nombre total d'itérations
        no_improvement = 0  # Compteur d'itérations sans amélioration
        itr = 1  # Compteur d'itérations

        # Itérer jusqu'à ce que la condition d'arrêt soit atteinte
        while no_improvement < self.stop_on_non_improvement and itr <= self.max_iter:
            self.log(f"[ITER] [{itr}] --- {best_perplexity}")  # Journalise l'itération en cours

            # Génère des solutions candidates
            candidates = self.get_candidates(current_sentence)

            # Initialise les variables pour la meilleure solution candidate
            best_candidate = None
            best_candidate_perplexity = None
            used_move = None

            # Évalue chaque solution candidate
            for candidate, swap in candidates:
                candidate_perplexity = self.calculate_perplexity(candidate)  # Calcule la perplexité de la candidate
                # Met à jour la meilleure candidate si elle est meilleure et respecte les conditions tabou
                if best_candidate_perplexity is None \
                    or (((not self.is_move_used(swap)) or (candidate_perplexity < best_perplexity)) \
                        and (candidate_perplexity < best_candidate_perplexity)):
                        best_candidate = candidate  # Met à jour la meilleure candidate
                        best_candidate_perplexity = candidate_perplexity  # Met à jour la perplexité de la meilleure candidate
                        used_move = swap  # Enregistre le mouvement utilisé

            self.log(f"[{best_candidate_perplexity}] {best_candidate}")

            # Vérifie si la meilleure candidate est une amélioration
            if best_candidate_perplexity < best_perplexity:
                # Met à jour la meilleure solution
                best_sentence = best_candidate
                best_perplexity = best_candidate_perplexity
                no_improvement = 0
                self.log(f"[IM] [{itr}/{self.max_iter}] [{best_perplexity}]")
            else:
                # Incrémente le compteur d'itérations sans amélioration
                no_improvement += 1
                self.log(f"[NO IM] __{no_improvement}__")

            # Considère la meilleure candidate comme la solution actuelle
            current_sentence = best_candidate
            current_perplexity = best_candidate_perplexity


            self.to_tabulist(used_move)
            itr += 1

            # Sortie si la limite de non-amélioration est atteinte
            if(no_improvement == self.stop_on_non_improvement):
                self.log(f"[NO IM] break; {best_sentence} | {best_perplexity}")
                break

        # Retourne la meilleure solution et sa perplexité
        return best_sentence, best_perplexity

---
# WAHLE OPT

In [None]:
import numpy as np
from itertools import permutations

class WhaleOptimization:
    """
    Implementation de l'algorithme d'optimisation des baleines (WOA) pour optimiser l'ordre des mots
    dans une phrase en fonction du score de perplexité.
    """

    def __init__(self, sentence: str, scorer,
                 n_whales: int = 20, max_iter: int = 80,
                 tabu_tenure=10, tabu_no_imp=10, tabu_max_iter=50,
                 cache: Caching = Caching(), logger=None, debug=False):
        """
        Initialise l'algorithme d'optimisation des baleines.

        Args:
            sentence (str): Phrase d'entrée à optimiser.
            scorer (object): Scorer avec une méthode `get_perplexity` pour évaluer la qualité de la phrase.
            n_whales (int, optionnel): Nombre de baleines dans la population. Par défaut, 20.
            max_iter (int, optionnel): Itérations maximales pour WOA. Par défaut, 80.
            tabu_tenure (int, optionnel): Taille de la liste tabou dans la recherche tabou. Par défaut, 10.
            tabu_no_imp (int, optionnel): Itérations sans amélioration pour arrêter la recherche tabou. Par défaut, 10.
            tabu_max_iter (int, optionnel): Itérations maximales dans la recherche tabou. Par défaut, 50.
            logger (callable, optionnel): Fonction de journalisation. Par défaut, None.
            debug (bool, optionnel): Activer les messages de débogage. Par défaut, False.
        """
        self.words = sentence.split(' ')  # Sépare la phrase en mots
        self.n = len(self.words)  # Nombre total de mots
        self.scorer = scorer  # Instance du scoreur pour évaluer la perplexité
        self.n_whales = n_whales  # Nombre de baleines
        self.max_iter = max_iter  # Nombre maximal d'itérations
        self.logger = logger  # Fonction de journalisation
        self.debug = debug  # Indicateur pour activer le débogage

        self.cwi = None  # Index de la baleine actuelle
        self.itr = ''  # Itération actuelle

        # Initialise la recherche tabou pour des améliorations locales
        self.TabuSearch = TabuSearch(
            self.words,
            self.__calculate_perplexity,
            tabu_tenure=tabu_tenure,
            stop_on_non_improvement=tabu_no_imp,
            max_iter=tabu_max_iter,
            debug=debug,
            logger=self.log
        )

        # Cache pour les calculs de perplexité
        self.cache = cache

    def __create_initial_sols(self) -> np.ndarray:
        """Génère une solution initiale aléatoire (permutation de mots)."""
        return np.random.permutation(self.n)  # Retourne une permutation aléatoire des indices des mots

    def log(self, msg):
        """
        Journalise les messages pour le débogage et le suivi de l'exécution.

        Args:
            msg (str): Le message à journaliser.
        """
        if self.debug:  # Vérifie si le débogage est activé
            if self.logger is None:  # Si aucun logger fourni
                if self.cwi is None:
                    print(f"[WOA] [{self.itr}] {msg}")  # Imprime le message sans baleine actuelle
                else:
                    print(f"[WOA] [{self.itr}] [WHALE {self.cwi + 1}] {msg}")  # Imprime avec l'index de la baleine
            else:
                if self.cwi is None:
                    self.logger(f"[{self.itr}] {msg}")  # Utilise le logger fourni
                else:
                    self.logger(f"[{self.itr}] [WHALE {self.cwi + 1}] {msg}")  # Journalise avec l'index de la baleine

    def _compute_A(self, a: float):
        """
        Calcule le coefficient A pour les équations de mouvement du WOA.

        """
        r = np.random.uniform(0.0, 1.0, size=1)  # Génère un nombre aléatoire entre 0 et 1
        return (2.0*np.multiply(a, r)) - a  # Calcule et retourne le coefficient A

    def _compute_C(self):
        """
        Calcule le coefficient C pour les équations de mouvement du WOA.


        """
        r = np.random.uniform(0.0, 1.0, size=1)  # Génère un nombre aléatoire entre 0 et 1
        return 2.0 * r  # Calcule et retourne le coefficient C

    def __to_sentence_text(self, sol):
        """
        Convertit une solution (ordre des mots) en une phrase lisible.


        """
        return " ".join([self.words[i] for i in sol])  # Crée et retourne la phrase à partir des indices

    def __get_from_cache(self, solution: tuple) -> float:
        """Récupère la perplexité d'une solution à partir du cache."""
        return self.cache.get(solution)  # Retourne la valeur correspondante dans le cache

    def __calculate_perplexity(self, solution: np.ndarray) -> float:
        """
        Calcule la perplexité d'une solution donnée, avec mise en cache.

          """
        solt = tuple(solution)
        cached = self.__get_from_cache(solt)
        if cached is not None:
            return cached


        sentence = self.__to_sentence_text(solution)
        submission = pd.DataFrame({'id': [0], 'text': [sentence] })
        perplexities = self.scorer.get_perplexity(submission["text"].tolist())
        self.cache.add(solt, perplexities[0])
        return perplexities[0]

    def __caching_state(self):
        """Affiche l'état du cache."""
        self.log(self.cache.state())

    def __encircling_prey(self, current_pos: np.ndarray, best_pos: np.ndarray, A: float, C: float) -> np.ndarray:
        """
        Méthode pour encercler une proie ( sur meilleure solution).


        """
        D = abs(C * best_pos - current_pos)  # Calcule la distance
        new_pos = best_pos - A * D  # Calcule la nouvelle position
        return np.argsort(new_pos)  # Retourne l'ordre des indices de la nouvelle position

    def __search_for_prey(self, current_pos: np.ndarray, random_pos: np.ndarray, A: float, C: float) -> np.ndarray:
        """
        Méthode pour rechercher une proie aléatoire.

         """
        D = abs(C * random_pos - current_pos)  # Calcule la distance
        new_pos = random_pos - A * D  # Calcule la nouvelle position
        return np.argsort(new_pos)  # Retourne l'ordre des indices de la nouvelle position

    def __bubble_net_attack(self, current_pos: np.ndarray, best_pos: np.ndarray) -> np.ndarray:
        """
        Méthode d'attaque par filet à bulles.

          """
        D = abs(best_pos - current_pos)  # Calcule la distance
        b = 1  # Paramètre pour le calcul
        l = random.uniform(-1, 1)  # Génère un nombre aléatoire entre -1 et 1
        new_pos = D * np.exp(l * b) * np.cos(2 * np.pi * l) + best_pos  # Calcule la nouvelle position
        return np.argsort(new_pos)  # Retourne l'ordre des indices de la nouvelle position

    def __amend_position(self, position: np.ndarray) -> np.ndarray:
        """Assure que la position est une permutation valide."""
        return np.argsort(position)

    def __local_search(self, solution: np.ndarray) -> Tuple[np.ndarray, float]:
        """Applique la recherche locale pour améliorer la solution."""
        return self.TabuSearch.optimize(solution)
    def optimize(self) -> Tuple[np.ndarray, float, List[float], List[list]]:
        """
        Exécute l'algorithme d'optimisation des baleines pour trouver le meilleur ordre de mots.

        Returns:
            Tuple[np.ndarray, float]: La meilleure solution et son score de perplexité.
        """

        # Initialise la population de baleines avec des solutions aléatoires
        population = []
        perplexities_values = []
        for w in range(self.n_whales):
            self.cwi = w
            solution = self.__create_initial_sols()  # Crée une solution initiale
            self.log(f"sol {solution} --> to improve")
            improved_solution, improved_perplexity = self.__local_search(solution)  # Améliore la solution localement
            self.log(f"{improved_solution} | {improved_perplexity}")
            population.append(improved_solution)  # Ajoute la solution améliorée à la population
            perplexities_values.append(improved_perplexity)  # Ajoute la perplexité correspondante
        self.cwi = None

        self.log("------------------------------")
        self.log(f"[WHALES]")
        self.log(population)
        self.log(perplexities_values)
        self.log("------------------------------")

        # Identifie la meilleure baleine initiale
        best_idx = np.argmin(perplexities_values)
        prey = population[best_idx].copy()
        best_perplexity = perplexities_values[best_idx]  # Récupère la perplexité correspondante

        self.log(f"[PREY] : {prey} | {best_perplexity}")

        # Démarre la boucle principale de WOA
        self.itr = 1
        while self.itr <= self.max_iter:  # Boucle jusqu'à ce que le nombre maximal d'itérations soit atteint
            self.log(f"[START ITER] {prey} | {best_perplexity}")


            a = 2 - self.itr / self.max_iter * 2  # Calcule le paramètre a

            # Met à jour chaque baleine dans la population
            for self.cwi in range(self.n_whales):
                cwhale = population[self.cwi]  # Récupère la baleine actuelle
                self.log(f"{cwhale} | {perplexities_values[self.cwi]}")
                A = self._compute_A(a)  # Calcule le coefficient A
                C = self._compute_C()  # Calcule le coefficient C
                p = random.random()  # Génère un nombre aléatoire pour déterminer le mouvement
                self.log(f"a={a}, A={A}, C={C}, p={p}")

                if p < 0.5:
                    if abs(A) < 1:
                        # Encercle la mielleure sulution afin de le  rapprocher
                        new_pos = self.__encircling_prey(cwhale, prey, A, C)  # Calcule la nouvelle position
                        self.log(f"[MOVE] encircling prey --> {new_pos}")
                    else:
                        # Recherche une autre solution(explorer autre zone  de recherche)
                        rand_idx = random.randint(0, self.n_whales - 1)  # Choisit une whale aléatoire
                        random_pos = population[rand_idx]  # Récupère sa position
                        new_pos = self.__search_for_prey(cwhale, random_pos, A, C)  # Calcule la nouvelle position
                        self.log(f"[MOVE] search for prey --> {new_pos}")
                else:
                    new_pos = self.__bubble_net_attack(cwhale, prey)  # Effectue une attaque par filet à bulles(exploitation)
                    self.log(f"[MOVE] bubble net attack --> {new_pos}")


                # apres la recuperation de la nouvelle position
                # Applique une recherche locale pour améliorer la nouvelle position
                improved_pos, improved_perplexity = self.__local_search(new_pos)
                self.log(f"[IMPROVE POSITION] {new_pos}, [{improved_perplexity}]")


                population[self.cwi] = improved_pos  # Met à jour la population avec la solution améliorée
                perplexities_values[self.cwi] = improved_perplexity  # Met à jour la valeur de perplexité

            # Met à jour la mieulleur solution  si une autre meilleure solution est trouvée dans la  nouvelle population
            best_idx = np.argmin(perplexities_values)
            if perplexities_values[best_idx] < best_perplexity:  # Vérifie si une meilleure solution a été trouvée
                prey = population[best_idx]  # Met à jour la mieulleur solution
                best_perplexity = perplexities_values[best_idx]  # Met à jour la perplexité
                self.log(f"[RES] [IMP] Prey: {prey} with perplexity {best_perplexity}")
                self.log(f"[RES] [NO IMP] Prey: {prey} with perplexity {best_perplexity}")

            self.cwi = None
            self.itr += 1

        self.__caching_state()
        return prey, best_perplexity, self.__to_sentence_text(prey)  # Retourne la meilleure solution, sa perplexité et la phrase reconstruite

In [None]:
model_path = "/kaggle/input/gemma-2/transformers/gemma-2-9b/2"
scorer = PerplexityCalculator(model_path=model_path)

Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

In [None]:
# text = 'advent chimney elf family fireplace gingerbread mistletoe ornament reindeer scrooge'

# woa = WhaleOptimization(
#     text, score,
#     n_whales=3, max_iter=1, debug=True,
#     tabu_tenure=5, tabu_no_imp=1, tabu_max_iter=4
# )
# woa.optimize()

In [None]:
import pandas as pd
problem = pd.read_csv("/kaggle/input/santa-2024/sample_submission.csv")

res = []

for idx, row in problem.iterrows():
    print("----------------------------------------------")
    print("----------------------------------------------")
    print("----------------------------------------------")
    def logger(msg):
        print(f"[WOA {idx}] {msg}")
    woa = WhaleOptimization(
        row.text, score,
        n_whales=3, max_iter=10, debug=True, 
        tabu_tenure=10, tabu_no_imp=3, tabu_max_iter=20,
        logger = logger
    )
    sol, p, sen = woa.optimize()
    res.append({
        "id": len(res),
        "or": row.text,
        "sol": sol,
        "text": sen,
        "p": p
    })

submission_ = pd.DataFrame(res)
submission_

In [None]:
submission = submission_[['id', 'text']]
submission.to_csv("submission.csv", index=False)
submission