In [4]:
import fitz  # PyMuPDF
import re
from pathlib import Path
from tqdm import tqdm # Pour une belle barre de progression

# --- Fonctions de nettoyage (celles que nous avons définies précédemment) ---
def clean_text(text):
    text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
    text = re.sub(r'Page\s\d+\s(sur|of)\s\d+', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# --- La nouvelle fonction principale ---

def process_all_pdfs_in_directory(directory_path: str) -> str:
    """
    Scanne un répertoire, trouve tous les fichiers .pdf, en extrait le texte,
    le nettoie et retourne une seule chaîne de caractères contenant tout le texte.
    
    Args:
        directory_path: Le chemin vers le répertoire contenant les PDF.

    Returns:
        Une chaîne de caractères unique avec le contenu de tous les PDF.
    """
    
    # Utiliser pathlib pour une manipulation de chemin robuste
    path = Path(directory_path)
    
    # 1. Lister tous les fichiers et ne garder que les .pdf
    # path.glob('*.pdf') est un générateur qui trouve tous les fichiers correspondant au motif
    pdf_files = list(path.glob('*.pdf'))
    
    if not pdf_files:
        print(f"⚠️ Aucun fichier PDF trouvé dans le répertoire : {directory_path}")
        return ""
        
    print(f"✅ Trouvé {len(pdf_files)} fichier(s) PDF à traiter.")
    
    all_cleaned_text = []
    
    # 2. Parcourir chaque fichier PDF trouvé avec une barre de progression
    # tqdm rend le processus beaucoup plus agréable à suivre
    for pdf_path in tqdm(pdf_files, desc="Traitement des PDF"):
        try:
            doc = fitz.open(pdf_path)
            # 3. Extraire le texte de chaque page
            file_text = ""
            for page in doc:
                file_text += page.get_text("text") + "\n"
            doc.close()
            
            # 4. Nettoyer le texte extrait
            cleaned_file_text = clean_text(file_text)
            all_cleaned_text.append(cleaned_file_text)
            
        except Exception as e:
            # Gérer les erreurs si un PDF est corrompu ou illisible
            print(f"❌ Erreur lors du traitement du fichier {pdf_path.name}: {e}")
            
    # 5. Combiner le texte de tous les fichiers en un seul grand texte
    # On ajoute un séparateur clair pour marquer la fin d'un document
    return "\n\n<|END_OF_DOCUMENT|>\n\n".join(all_cleaned_text)


# --- Comment l'utiliser dans votre script principal ---

# Définir le chemin vers votre répertoire de données
RAW_DATA_PATH = "$HOME/stage/pdf-rag-project/data/raw" 

# Appeler la fonction pour obtenir tout le texte
full_corpus_text = process_all_pdfs_in_directory(RAW_DATA_PATH)

if full_corpus_text:
    print(f"\nTraitement terminé. Longueur totale du texte : {len(full_corpus_text)} caractères.")
    
    # Maintenant, vous pouvez passer `full_corpus_text` à votre
    # `RecursiveCharacterTextSplitter` pour créer les chunks.
    # ... la suite de votre pipeline RAG ...

⚠️ Aucun fichier PDF trouvé dans le répertoire : $HOME/stage/pdf-rag-project/data/raw


In [6]:
import fitz
import re
from pathlib import Path
from tqdm import tqdm


current_path = Path.cwd()
if current_path.name == "notebooks":
    PROJECT_ROOT = current_path.parent
else:
    PROJECT_ROOT = current_path
    
print(f"Racine du projet déterminée : {PROJECT_ROOT}")


RAW_DATA_PATH = PROJECT_ROOT / "data" / "raw"
PROCESSED_DATA_PATH = PROJECT_ROOT / "data" / "processed"

PROCESSED_DATA_PATH.mkdir(parents=True, exist_ok=True)



def clean_text(text):
    patterns_to_remove = [
        r"IMPACTDEV\s*Développement de solutions",  # Logo et slogan
        r"\w+-\w+-\d{2}-\d{2}",                    # ID de document comme DMGP-PR-03-01
        r"Date\s*:\s*\d{2}/\d{2}/\d{4}",          # Date comme Date : 04/03/2025
        r"Page\s+\d+\s+sur\s+\d+"                 # Numéro de page comme Page 1 sur 3
    ]
    
    for pattern in patterns_to_remove:
        text = re.sub(pattern, "", text, flags=re.IGNORECASE)
    text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
    text = re.sub(r'Page\s\d+\s(sur|of)\s\d+', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

# --- Fonction de traitement (inchangée, elle prend un chemin en argument) ---
def process_all_pdfs_in_directory(directory_path: Path) -> str:
    pdf_files = list(directory_path.glob('*.pdf'))
    
    if not pdf_files:
        print(f"⚠️ Aucun fichier PDF trouvé dans le répertoire : {directory_path}")
        return ""
        
    print(f"✅ Trouvé {len(pdf_files)} fichier(s) PDF à traiter dans '{directory_path}'.")
    
    all_cleaned_text = []
    
    for pdf_path in tqdm(pdf_files, desc="Traitement des PDF"):
        try:
            with fitz.open(pdf_path) as doc:
                file_text = ""
                for page in doc:
                    file_text += page.get_text("text") + "\n"
            
            cleaned_file_text = clean_text(file_text)
            all_cleaned_text.append(cleaned_file_text)
            
        except Exception as e:
            print(f"❌ Erreur lors du traitement du fichier {pdf_path.name}: {e}")
            
    return "\n\n<|END_OF_DOCUMENT|>\n\n".join(all_cleaned_text)


# --- Point d'entrée principal du script ---
if __name__ == "__main__":
    print(f"Racine du projet détectée : {PROJECT_ROOT}")
    
    # Appeler la fonction principale avec le chemin que nous avons construit
    full_corpus_text = process_all_pdfs_in_directory(RAW_DATA_PATH)

    if full_corpus_text:
        print(f"\nTraitement terminé. Longueur totale du texte : {len(full_corpus_text)} caractères.")
        
        # Ici, vous continueriez avec le reste de votre pipeline :
        # 1. Découper `full_corpus_text` en chunks.
        # 2. Créer les embeddings pour chaque chunk.
        # 3. Construire l'index FAISS.
        # 4. Sauvegarder l'index et les chunks dans PROCESSED_DATA_PATH.
        
        # Exemple de sauvegarde :
        # faiss_index_path = PROCESSED_DATA_PATH / "my_faiss.index"
        # faiss.write_index(index, str(faiss_index_path))
        # print(f"Index FAISS sauvegardé dans : {faiss_index_path}")
        
    else:
        print("Aucun texte n'a été traité. Fin du script.")

Racine du projet déterminée : /home/elyes/stage/pdf-rag-project
Racine du projet détectée : /home/elyes/stage/pdf-rag-project
✅ Trouvé 1 fichier(s) PDF à traiter dans '/home/elyes/stage/pdf-rag-project/data/raw'.


Traitement des PDF: 100%|█████████████████████████| 1/1 [00:00<00:00, 50.16it/s]


Traitement terminé. Longueur totale du texte : 1522 caractères.





In [2]:
import fitz
import re
from pathlib import Path
from tqdm import tqdm
import difflib

# --- Définition des chemins (inchangé) ---
current_path = Path.cwd()
if current_path.name == "notebooks":
    PROJECT_ROOT = current_path.parent
else:
    PROJECT_ROOT = current_path
print(f"Racine du projet déterminée : {PROJECT_ROOT}")

RAW_DATA_PATH = PROJECT_ROOT / "data" / "raw"

# --- NOUVELLE STRATÉGIE DE NETTOYAGE PAGE PAR PAGE ---

def smart_clean_pdf_text(doc: fitz.Document) -> str:
    """
    Nettoie le texte d'un document PDF en traitant chaque page individuellement
    pour supprimer les en-têtes et pieds de page, tout en préservant la page de garde.
    """
    
    full_cleaned_text = ""
        
    # 1. Définir les motifs d'en-tête et de pied de page à supprimer
    # On utilise des ancres (^ pour début de ligne, $ pour fin de ligne) pour être plus précis
    header_patterns = [
        re.compile(r"^Procédure\s*\w+-\w+-\d{2}-\d{2}", re.IGNORECASE),
        re.compile(r"^IMPACTDEV Développement de solutions", re.IGNORECASE)
    ]
    footer_patterns = [
        re.compile(r"Page\s*\d+\s*sur\s*\d+$", re.IGNORECASE),
        re.compile(r"Date\s*:\s*\d{2}/\d{2}/\d{4}$", re.IGNORECASE) # Date semble être dans le footer
    ]

    # 2. Parcourir chaque page du document
    for page_num, page in enumerate(doc):
        page_text = page.get_text("text")
        
        # 3. Laisser la première page (page_num == 0) presque intacte
        # Elle contient l'historique des versions que nous voulons garder.
        if page_num == 0:
            cleaned_page_text = page_text
        else:
            # Pour les autres pages, on nettoie les en-têtes et pieds de page
            lines = page_text.split('\n')
            cleaned_lines = []
            for line in lines:
                is_header_or_footer = False
                # Vérifier si la ligne correspond à un motif d'en-tête ou de pied de page
                for pattern in header_patterns + footer_patterns:
                    if pattern.search(line.strip()):
                        is_header_or_footer = True
                        break
                
                if not is_header_or_footer:
                    cleaned_lines.append(line)
            
            cleaned_page_text = "\n".join(cleaned_lines)

        # 4. Nettoyage final pour les espaces, etc.
        cleaned_page_text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', cleaned_page_text)
        cleaned_page_text = re.sub(r'(\n\s*){2,}', '\n\n', cleaned_page_text) # Garde les doubles sauts de ligne
        
        full_cleaned_text += cleaned_page_text + "\n"
        
    return full_cleaned_text.strip()


# --- Fonction de visualisation (inchangée) ---
def visualize_cleaning_effect(raw_text: str, cleaned_text: str, sample_size: int = 2000):
    print("-" * 50 + "\n🔬 VISUALISATION DE L'EFFET DU NETTOYAGE 🔬\n" + "-" * 50)
    raw_sample = raw_text[:sample_size]; cleaned_sample = cleaned_text[:sample_size]
    raw_lines = raw_sample.splitlines(); cleaned_lines = cleaned_sample.splitlines()
    diff = difflib.unified_diff(raw_lines, cleaned_lines, fromfile='Texte Brut', tofile='Texte Nettoyé', lineterm='')
    print("Légende : [- Ligne supprimée] [+ Ligne ajoutée]\n")
    for line in diff:
        print(line)
    print("-" * 50)


# --- Exécution principale pour le test ---

pdf_files = list(RAW_DATA_PATH.glob('*.pdf'))

if not pdf_files:
    print(f"⚠️ Aucun fichier PDF trouvé dans {RAW_DATA_PATH}.")
else:
    pdf_to_test = pdf_files[0]
    print(f"Analyse du fichier de test : {pdf_to_test.name}")
    
    raw_text = ""
    try:
        with fitz.open(pdf_to_test) as doc:
            # On passe l'objet 'doc' entier à notre nouvelle fonction
            cleaned_text = smart_clean_pdf_text(doc)
            
            # Pour la visualisation, on a besoin du texte brut complet
            for page in doc:
                raw_text += page.get_text("text") + "\n"
        
        visualize_cleaning_effect(raw_text, cleaned_text)

        print("\n Aperçu du début du texte NETTOYÉ :")
        print("'" + cleaned_text[:200] + "...'")

    except Exception as e:
        print(f"❌ Erreur lors du traitement du fichier {pdf_to_test.name}: {e}")

Racine du projet déterminée : /home/elyes/stage/pdf-rag-project
Analyse du fichier de test : DMGP-PR-03-01 Gestion de version de code.pdf
--------------------------------------------------
🔬 VISUALISATION DE L'EFFET DU NETTOYAGE 🔬
--------------------------------------------------
Légende : [- Ligne supprimée] [+ Ligne ajoutée]

--- Texte Brut
+++ Texte Nettoyé
@@ -1,11 +1,9 @@
- 
 Procédure 
 DMGP-PR-03-01 
 Gestion de version de code 
 Date : 04/03/2025 
 Page 1 sur3 
- 
- 
+
 Version actuelle 
 Propriétaire du 
 document 
@@ -16,7 +14,7 @@
 IMPACTDEV 
 Interne 
 Validé 
- 
+
 Établissement, vérification et approbation  
 Rôle 
 Nom & Prénom 
@@ -25,17 +23,16 @@
 Établi par  
 Faiez KTATA 
 DT 
- 
+
 Vérification 
 Hana GHRIBI 
 RMQ 
- 
+
 Approbation   
 Itebeddine GHORBEL 
 Nebras GHARBI 
 DG 
- 
- 
+
 Historique de versions  
 Version 
 Date 
@@ -58,23 +55,13 @@
 Nebras GHARBI 
 Validation 
 DG 
- 
- 
- 
- 
- 
- 
- 
- 
- 
+
 
  
 Procédure 
 DMGP-PR-03-01 
 Gestion de version de c

In [2]:
# Assurez-vous que ces bibliothèques sont bien importées
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import pickle # Pour sauvegarder nos chunks de texte

# --- Point de départ : la variable 'full_corpus_text' issue du nettoyage ---
# On s'assure qu'elle n'est pas vide avant de continuer.
if 'full_corpus_text' in locals() and full_corpus_text:
    print("Prêt à passer aux étapes de chunking, embedding et indexation.")
else:
    print("⚠️ La variable 'full_corpus_text' est vide ou n'existe pas. Veuillez exécuter la cellule de nettoyage d'abord.")
    # On arrête l'exécution de la cellule si le texte n'est pas prêt
    # (dans un notebook, vous pouvez simplement ne pas exécuter la suite)


# === ÉTAPE 2 : DÉCOUPAGE DU TEXTE (CHUNKING) ===
print("\n--- Étape 2 : Découpage du texte en chunks ---")

# On utilise un découpeur récursif qui essaie de respecter les paragraphes et les phrases.
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=700,      # Taille cible de chaque chunk en caractères.
    chunk_overlap=250,    # Nombre de caractères de chevauchement entre les chunks.
    length_function=len,
    is_separator_regex=False,
)

chunks = text_splitter.split_text(full_corpus_text)
print(f"✅ Le texte a été découpé en {len(chunks)} chunks.")


# === ÉTAPE 3 : VECTORISATION (EMBEDDING) ===
print("\n--- Étape 3 : Création des embeddings pour chaque chunk ---")

# On charge le modèle d'embedding.
# 'all-MiniLM-L6-v2' est un excellent choix pour démarrer : rapide et performant.
# Le modèle sera téléchargé lors de la première utilisation.
embedding_model = SentenceTransformer('BAAI/bge-small-en-v1.5', device='cuda')

# On convertit les chunks en vecteurs. Cette opération peut prendre du temps.
# show_progress_bar=True est très utile pour suivre l'avancement.
chunk_embeddings = embedding_model.encode(chunks, show_progress_bar=True)

print(f"✅ Embeddings créés. La forme de la matrice de vecteurs est : {chunk_embeddings.shape}")


# === ÉTAPE 4 : INDEXATION DANS FAISS ===
print("\n--- Étape 4 : Création et remplissage de l'index vectoriel FAISS ---")

# Obtenir la dimension des vecteurs (ex: 384 pour 'all-MiniLM-L6-v2')
d = chunk_embeddings.shape[1]

# On crée un index FAISS simple mais très précis.
# IndexFlatL2 calcule la distance exacte entre les vecteurs.
index = faiss.IndexFlatL2(d)

# On ajoute nos embeddings à l'index.
# Ils doivent être convertis en float32 pour FAISS.
index.add(np.array(chunk_embeddings).astype('float32'))

print(f"✅ Index FAISS créé avec succès. Il contient {index.ntotal} vecteurs.")


# === ÉTAPE 5 : SAUVEGARDE DES ARTEFACTS (TRÈS IMPORTANT !) ===
# L'embedding est lent. On sauvegarde les résultats pour ne pas avoir à le refaire.
print("\n--- Étape 5 : Sauvegarde de l'index et des chunks ---")

# Définir les chemins de sauvegarde dans notre dossier de données traitées
FAISS_INDEX_PATH = PROCESSED_DATA_PATH / "my_documents.index"
CHUNKS_PATH = PROCESSED_DATA_PATH / "my_documents_chunks.pkl"

# Sauvegarder l'index FAISS
faiss.write_index(index, str(FAISS_INDEX_PATH))

# Sauvegarder la liste des chunks de texte avec pickle
with open(CHUNKS_PATH, "wb") as f:
    pickle.dump(chunks, f)

print(f"✅ Index sauvegardé dans : {FAISS_INDEX_PATH}")
print(f"✅ Chunks sauvegardés dans : {CHUNKS_PATH}")
print("\n🎉 Le 'cerveau' de votre RAG est prêt et sauvegardé ! 🎉")

Prêt à passer aux étapes de chunking, embedding et indexation.

--- Étape 2 : Découpage du texte en chunks ---
✅ Le texte a été découpé en 3 chunks.

--- Étape 3 : Création des embeddings pour chaque chunk ---




Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Embeddings créés. La forme de la matrice de vecteurs est : (3, 384)

--- Étape 4 : Création et remplissage de l'index vectoriel FAISS ---
✅ Index FAISS créé avec succès. Il contient 3 vecteurs.

--- Étape 5 : Sauvegarde de l'index et des chunks ---
✅ Index sauvegardé dans : /home/elyes/stage/pdf-rag-project/data/processed/my_documents.index
✅ Chunks sauvegardés dans : /home/elyes/stage/pdf-rag-project/data/processed/my_documents_chunks.pkl

🎉 Le 'cerveau' de votre RAG est prêt et sauvegardé ! 🎉
