**Segmentation de texte en unités thématiques :**

Ce notebook permet de segmenter automatiquement un texte en unités thématiques. Chaque phrase est transformée en plongement ou embeddig, avec camembert. Ensuite, des plongements de passages sont créés, en agglomérant des phrases proches les unes des autres. La distance cosine entre les passages consécutifs est calculée, dans le but de séparer les passages différents, et joindre les passages similaires pour créer des paragraphes.  

*note : ce notebook a été créé dans google colab*

on commence par faire les importations nécessaires :

In [None]:
import os
import re
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModel, pipeline
from sklearn.metrics.pairwise import cosine_similarity
from scipy.signal import find_peaks
import pandas as pd

on charge le modèle camembert pour réaliser les embeddings :

In [None]:
# Load models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
# Embedding model
embedding_model_name = "almanach/camembertv2-base"
tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
model = AutoModel.from_pretrained(embedding_model_name).to(device)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.22k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/756k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/963 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/776 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/447M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at almanach/camembertv2-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


on utilise les expressions régulières pour segmenter le texte en passages, de 8 phrases :

In [None]:
# 1. Split text into small segments (sentences)
def split_text_into_segments(text, segment_size=8):
    sentences = re.split(r'(?<=[.!?])\s+', text)
    segments = [
        " ".join(sentences[i:i + segment_size])
        for i in range(0, len(sentences), segment_size)
    ]
    return segments

Au sein d'un passage, on prend chaque phrase, que l'on transforme en embedding. ensuite, cette fonction renvoie la moyenne de ces embedding pour donner un embedding du passage.

In [None]:
# 2. Aggregate sentence embeddings into segment embedding
def get_segment_embedding(segment):
    sentences = re.split(r'(?<=[.!?])\s+', segment)
    embeddings = []
    for sentence in sentences:
        inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        embeddings.append(outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy())
    return np.mean(embeddings, axis=0)  # Mean pooling


Ici, on calcule les distances sémantiques (distance cosine inversée) entre des embeddings de segments de texte consécutifs, puis lisse cette série de similarités avec une moyenne glissante.

In [None]:

# 3. Compute and smooth cosine similarities
def compute_smoothed_similarities(embeddings, window_size=3):
    similarities = [
        cosine_similarity([embeddings[i]], [embeddings[i + 1]])[0][0]
        for i in range(len(embeddings) - 1)
    ]
    similarities = 1 - np.array(similarities)  # Invert similarity scores
    return np.convolve(similarities, np.ones(window_size)/window_size, mode='valid')


In [None]:
ici, on calcule les similarités lissées entre les passages :

In [None]:
def compute_smoothed_similarities(embeddings, window_size=3):
    # If embeddings contain less than 2 elements, return an empty array or handle it appropriately
    if len(embeddings) < 2:
        return np.array([])  # Return empty array to avoid error

    similarities = [
        cosine_similarity([embeddings[i]], [embeddings[i + 1]])[0][0]
        for i in range(len(embeddings) - 1)
    ]
    similarities = 1 - np.array(similarities)  # Invert similarity scores
    return np.convolve(similarities, np.ones(window_size)/window_size, mode='valid')

In [None]:
# 4. Detect boundaries using peaks
def detect_boundaries(similarities, threshold_factor=1.5):
    mean, std = np.mean(similarities), np.std(similarities)
    threshold = mean + threshold_factor * std
    peaks, _ = find_peaks(similarities, height=threshold, distance=1)
    return peaks

Ici, on fusionne les passages qui sont sémantiquement proches :

In [None]:
#5. Merge similar segments
def merge_similar_segments(segments, similarities, threshold=0.8):
    """
    Merge adjacent segments based on their similarity scores.

    Args:
        segments (list): List of text segments.
        similarities (list): List of similarity scores (length = len(segments) - 1).
        threshold (float): Similarity threshold for merging.

    Returns:
        list: Merged list of text passages.
    """
    merged_segments = []
    temp_segment = segments[0]

    for i in range(1, len(segments)):
        # Ensure index is within bounds for similarities
        if i - 1 < len(similarities) and similarities[i - 1] < threshold:
            # Low similarity = finalize current segment
            merged_segments.append(temp_segment)
            temp_segment = segments[i]
        else:
            # High similarity = merge segments
            temp_segment += " " + segments[i]

    # Append the final merged segment
    merged_segments.append(temp_segment)
    return merged_segments

A présent, il faut appliquer les boucles à nos données.

On commence par charger nos données :

In [None]:
import pandas as pd

# Charger le fichier CSV dans un DataFrame
df = pd.read_csv('/content/csv_intermediaire.csv', encoding='utf-8')

on éxécute le tout, boucle par boucle :

on créé des objets titres et segments, pour manipuler les fichiers en entrée et en sortie plus facilement. On a à la fin du processus un objet df_segmented, qui est notre output.

In [None]:
titres = []
segments = []

for _, row in df.iterrows():
    titre = row['title']
    texte = row['content']

    # Step 1: Initial split into segments
    segments_for_text = split_text_into_segments(texte, segment_size=8)  # Segments for this text

    # Step 2: Generate embeddings
    embeddings = [get_segment_embedding(segment) for segment in segments_for_text]

    # Step 3: Compute smoothed similarities
    smoothed_similarities = compute_smoothed_similarities(embeddings)

    # Step 4: Detect boundaries
    boundaries = detect_boundaries(smoothed_similarities)

    # Step 5: Merge similar segments
    merged_segments_for_text = merge_similar_segments(segments_for_text, smoothed_similarities, threshold=0.8)

    # Append title and segments for this text
    for segment in merged_segments_for_text:
        titres.append(titre)
        segments.append(segment)

df_segmented = pd.DataFrame({
    'title': titres,
    'content': segments
})

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=

on sauvegarde ces résultats (fonctionne dans colab) :

In [None]:
df_segmented.to_csv('dataframe_corrige_segmente_vfinale.csv', index=False)