# Install

In [None]:
%pip install transformers plotly tqdm numpy torch torchvision torchaudio pandas nbformat scipy imageio kaleido

# Load model & dependencies

In [None]:
from transformers import BertModel, BertTokenizer
import plotly.express as px
from tqdm import tqdm
import glob
import numpy as np
import scipy
import torch
import random
import imageio
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import pandas as pd
torch.no_grad()

model = BertModel.from_pretrained('bert-base-uncased')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Load texts and get all tokens from them

In [None]:
text = ""
with open("./little_prince_data/little_prince.txt", 'r') as f:
    text = f.read()

ids = tokenizer.encode(text, add_special_tokens=False)
tokens = tokenizer.convert_ids_to_tokens(ids)
unique_tokens = list(set(tokens))

# Save the unique tokens
with open("./little_prince_data/unique_tokens.txt", "w") as f:
    f.write("\n".join(unique_tokens))

# Get "raw" embeddings for each token in our texts

In [None]:
# If you have trouble to compute everything once, you can go step by step with begin & end

# Load vocabulary
with open("./little_prince_data/unique_tokens.txt") as f:
    vocab = f.read().split("\n")

# Compute embeddings
raw_emb = []
for token in tqdm(vocab):
    id = tokenizer.convert_tokens_to_ids(token)
    tensor = torch.tensor([id]).unsqueeze(0)
    emb = model(tensor).last_hidden_state[0]
    raw_emb.append(emb)

# Save embeddings
raw_emb = torch.stack(raw_emb).reshape(-1, 768).detach().numpy()
np.save(f"./little_prince_data/raw_emb.npy", raw_emb)
#np.save(f"raw_emb_{begin}-{end}.npy", vocab_emb)

# In case you want to concatenate the matrices from different runs
# file_list = glob.glob("*.npy")
# matrices = [np.load(file) for file in file_list]
# raw_emb = np.concatenate(matrices, axis=0)
# np.save('raw_emb.npy', raw_emb)

# Stats
print("Moyennes:", np.mean(raw_emb, axis=0)[:5])
print("Écarts-types:", np.std(raw_emb, axis=0)[:5])

# Display one dimension of the embedding matrix
fig = px.histogram(raw_emb[:, 100])
fig.show()

# Compute correlation and display it

In [None]:
# Load std embeddings
raw_emb = np.load('./little_prince_data/raw_emb.npy')

# Calculer la matrice de corrélation
corr = np.corrcoef(raw_emb, rowvar=False)
np.save('./little_prince_data/corr.npy', corr)

# Display heatmap
fig = px.imshow(corr)
fig.show()

In [None]:
def matrix_stats(matrix):
    """Retourne les statistiques de la matrice en excluant la diagonale."""
    values = matrix[~np.eye(matrix.shape[0],dtype=bool)]
    return {
        'min': np.min(values),
        'max': np.max(values),
        'mean': np.mean(values),
        'median': np.median(values)
    }

print("Statistics:", matrix_stats(corr))

# Histogramme des valeurs de corrélation
fig = go.Figure(data=go.Histogram(x=corr[~np.eye(corr.shape[0], dtype=bool)], nbinsx=50))
fig.update_layout(title='Distribution des Valeurs de Corrélation', xaxis_title='Valeur de Corrélation', yaxis_title='Compte')
fig.show()

# Compute absolute correlation and display it

In [None]:
# Charger la matrice de corrélation
corr = np.load('./little_prince_data/corr.npy')

# Calculer la matrice de corrélation absolue
abs_corr = np.abs(corr)
np.save('./little_prince_data/abs_corr.npy', corr)

# Display heatmap
fig = px.imshow(abs_corr)
fig.show()

# Histogramme des valeurs de corrélation absolue
fig2 = go.Figure(data=go.Histogram(x=abs_corr[~np.eye(abs_corr.shape[0], dtype=bool)], nbinsx=50))
fig2.update_layout(title='Distribution des Valeurs de Corrélation Absolue', xaxis_title='Valeur de Corrélation Absolue', yaxis_title='Compte')
fig2.show()

# Sort dimensions (using absolute correlation as a metric)

In [None]:
MAX_CORR = 0.40

def fitness(path, abs_corr_matrix):
    N = len(path)
    distance = 0
    for ia, dim_a in enumerate(path):
        for ib, dim_b in enumerate(path):
            ang_a = ia / N * 2 * np.pi
            ang_b = ib / N * 2 * np.pi
            ang_dist = np.pi - abs(np.pi - abs(ang_a - ang_b)) # Distance angulaire bornée par Pi
            expected_dist = max(np.pi - (1/MAX_CORR)*np.pi*abs_corr_matrix[dim_a, dim_b], 0) # Distance angulaire attendue
            distance += (ang_dist - expected_dist) ** 2 # cout erreur quadratique
    return distance

def crossover(parent1, parent2):
    """Effectue un croisement en prenant un sous-chemin de parent1 et complète avec parent2."""
    # Sélectionne un sous-chemin aléatoire de parent1
    start, end = random.sample(range(len(parent1)), 2)
    
    # Crée un enfant en copiant le sous-chemin de parent1
    child = [-1]*len(parent1)
    if start <= end:
        for i in range(start, end + 1):
            child[i] = parent1[i]
    else:
        for i in range(start, len(parent1)):
            child[i] = parent1[i]
        for i in range(0, end + 1):
            child[i] = parent1[i]

    # Complète l'enfant avec les éléments de parent2
    pointer = 0
    for i in range(len(child)):
        if child[i] == -1:
            while parent2[pointer] in child:
                pointer += 1
            child[i] = parent2[pointer]
            pointer += 1

    return child

def mutate(path, mutation_rate, abs_corr_matrix, top_n):
    """Déplace quelques éléments du chemin à une autre position plus adaptée."""
    # Choose random elements to remove from the path
    selected_elems = [elem for elem in path if random.random() < mutation_rate]
    new_path = [elem for elem in path if elem not in selected_elems]

    # Insert the removed elements at random best position
    for elem in selected_elems:

        # Compute the score of each position
        scores = []
        for i in range(len(new_path)):
            prev = new_path[i]
            next = new_path[(i+1)%len(new_path)]
            scores += [abs_corr_matrix[prev, elem] + abs_corr_matrix[elem, next]]

        # Select the top_n best positions
        ranked_indices = np.argsort(scores)[-top_n:] # Index of the top_n positions
        ranked_scores = np.sort(scores)[-top_n:] # Score of the top_n positions

        # Normalize the scores to get a probability distribution
        proba = ranked_scores / np.sum(ranked_scores)

        # Choose a position according to the probability distribution
        new_pos = np.random.choice(ranked_indices, p=proba)
        new_path.insert(new_pos, elem)
    
    return new_path

def genetic_algorithm(abs_corr_matrix, population_size, generations, mutation_rate, top_n_mutate, top_n_parents):
    """Effectue l'algorithme génétique pour le TSP."""
    
    # Génère une population initiale de chemins aléatoires
    population = [list(np.random.permutation(len(abs_corr_matrix))) for _ in range(population_size)]
    best_distance = np.inf
    best_path = None

    for generation in range(generations):
        # Évalue la fitness de chaque individu
        distances = [fitness(path, abs_corr_matrix) for path in population]
        ranked_indices = np.argsort(distances)
        print(f"Gen {generation}: {min(distances)}")
        #print('scores', distances)

        # Sauvegarde le meilleur individu si meilleur que courant
        if min(distances) < best_distance:
            best_distance = min(distances)
            best_path = population[np.argmin(distances)]

        # Sélection des parents pour le croisement
        parents = [population[i] for i in ranked_indices[:top_n_parents]]
        #print('parents', parents)

        # Création de la nouvelle population
        new_population = [p for p in parents] # Copie les parents
        while len(new_population) < population_size:
            parent1, parent2 = random.sample(parents, 2)
            child = crossover(parent1, parent2)
            new_population.append(child)

        # Appliquer la mutation
        population = [mutate(path, mutation_rate, abs_corr_matrix, top_n_mutate) for path in new_population]

    # Renvoie le meilleur chemin trouvé
    return best_path, best_distance

# Charger la matrice de corrélation absolue
abs_corr = np.load('./little_prince_data/abs_corr.npy')

# Appliquer l'algorithme génétique
best_path, best_distance = genetic_algorithm(
    abs_corr_matrix=abs_corr, 
    population_size=100, 
    generations=300, 
    mutation_rate=0.01, 
    top_n_mutate=5, 
    top_n_parents=10,
)

print("Meilleur chemin:", best_path)
print("Meilleur distance:", best_distance)

with open('./little_prince_data/path.txt', 'w') as f:
    f.write(str(best_path) + '\n')
    f.write(str(best_distance))

# Radar Chart (absolute correlation)

##### Load and prepare data & filter

In [None]:
# Load path
with open('./little_prince_data/path.txt', 'r') as f:
    path = eval(f.readline())

# Load the correlation matrix
corr = np.load('./little_prince_data/corr.npy')

# Compute coef for each value in the path
coefs = [1] # List that contains values {1, -1} for each value in the path
current_coef = 1
for i in range(len(path)):
    current = path[i]
    next = path[(i+1)%len(path)]
    next_coef = int(np.sign(corr[current, next]))
    coefs += [current_coef * next_coef]
    current_coef = next_coef

# Define filter
def circular_convolution(signal, kernel):
    len_kernel = len(kernel)
    half_kernel = len_kernel // 2
    extended_signal = np.concatenate([signal[-half_kernel:], signal, signal[:half_kernel]]) # Étendre le signal aux deux extrémités pour gérer le cas circulaire
    result = np.convolve(extended_signal, kernel, mode='valid') # Appliquer la convolution standard
    return result

N = 32
filter = [scipy.stats.norm.pdf(3*x/(N+1), 0, 1) for x in range(-N, N+1)]
filter_norm = [x / sum(filter) for x in filter]

##### Display words

In [None]:
# Re-order the dimensions with coefs and apply the filter
def compute_word_shape(emb, path, coefs, filter):
    sorted = [emb[dim] * coefs[i] for i, dim in enumerate(path)]
    filtered = circular_convolution(np.array(sorted), filter)
    return filtered

# Compute the shape of some words
words = ['kill', 'death']
ids = [tokenizer.encode(word, add_special_tokens=False)[0] for word in words]
embs = [model(torch.tensor([id]).unsqueeze(0)).last_hidden_state[0].detach().numpy() for id in ids]
shapes = [compute_word_shape(emb[0], path, coefs, filter_norm) for emb in embs]

# Display the shape of the words
fig = go.Figure()
for shape, word in zip(shapes, words):
    fig.add_trace(go.Scatterpolar(r=shape, name=word, fill='toself', opacity=0.60))
fig.update_layout({'polar': {
    "radialaxis": {"range": [-0.8, 0.8], "showticklabels": False}, 
    "angularaxis": {"showticklabels": False},
}})
fig.show()

##### Display word representation depending on context

In [None]:
# Re-order the dimensions with coefs and apply the filter
def compute_word_shape(emb, path, coefs, filter):
    sorted = [emb[dim] * coefs[i] for i, dim in enumerate(path)]
    filtered = circular_convolution(np.array(sorted), filter)
    return filtered

# Define sentence and word of interest
sentence = "It would have been better to come back at the same hour"
interest = "better"

# Define context before & after
words = sentence.lower().split()
pos_interest = words.index(interest.lower())
context_before = list(reversed([' '.join(words[i:pos_interest+1]) for i in range(pos_interest+1)]))
context_after = [' '.join(words[pos_interest:i+1]) for i in range(pos_interest, len(words))]

# Compute word interest's embedding for each context
embs_before = []
embs_after = []
for cb in context_before:
    ids = tokenizer.encode(cb)
    tensor = torch.tensor(ids).unsqueeze(0)
    emb = model(tensor).last_hidden_state[0][-2].tolist()
    embs_before.append(emb)
for ca in context_after:
    ids = tokenizer.encode(ca)
    tensor = torch.tensor(ids).unsqueeze(0)
    emb = model(tensor).last_hidden_state[0][1].tolist()
    embs_after.append(emb)

# Create the figure
max_len = max(len(embs_before), len(embs_after))
specs = [[{"type": "scatterpolar"} for _ in range(max_len)] for _ in range(2)]
fig = make_subplots(
    rows=2, cols=max_len, start_cell="top-left", specs=specs, 
)

# Display context before
for i, eb in enumerate(embs_before):
    shape = compute_word_shape(eb, path, coefs, filter_norm)
    polar = go.Scatterpolar(r=shape, name=context_before[i], fill='toself', opacity=0.60, line=dict(color='blue'))
    fig.add_trace(polar, row=1, col=i+1)
for i, ea in enumerate(embs_after):
    shape = compute_word_shape(ea, path, coefs, filter_norm)
    polar = go.Scatterpolar(r=shape, name=context_after[i], fill='toself', opacity=0.60, line=dict(color='red'))
    fig.add_trace(polar, row=2, col=i+1)


# Ajout des annotations pour les titres en diagonale
for i in range(max_len):
    # Position pour les titres de la rangée "Before"
    if i < len(context_before):
        fig.add_annotation(
            x=(i + 0.4) / max_len,  # Calculate the center position
            y=0.95,                # Slightly above the top of the subplot
            text=context_before[i],
            showarrow=False,
            xref="paper",
            yref="paper",
            xanchor="center",
            yanchor="bottom",
            align="left",
            font=dict(size=25),
            textangle=-30
        )

    # Position pour les titres de la rangée "After"
    if i < len(context_after):
        fig.add_annotation(
            x=(i + 0.4) / max_len,  # Calculate the center position
            y=0.40,               # Slightly below the bottom of the subplot
            text=context_after[i],
            showarrow=False,
            xref="paper",
            yref="paper",
            xanchor="center",
            yanchor="bottom",
            align="left",
            font=dict(size=25),
            textangle=-30
        )

# Update layout to remove axis and show figure
layout = {
    "showlegend": False, 
    "title_text": f"Evolution of the word \"{interest}\" in function of its context (before and after)",
    "title_x": 0.5, 
    "title_font": {"size": 40},
    "width": 3000,
    "height": 1600,
    "margin": {'t': 250},
}
for i in range(1, 2*max_len+1):
    layout[f"polar{i}"] = {
        "radialaxis": {"range": [-0.3, 0.3], "showticklabels": False}, 
        "angularaxis": {"showticklabels": False},
    } 
fig.update_layout(layout)
fig.show()