# Chargement des fichiers sources

In [20]:
import os
from PyPDF2 import PdfReader

# Import Files 
def file_loader(folder):
    files = []
    for i, file_name in enumerate(os.listdir(folder)):
        file_path = os.path.join(folder, file_name)
        if os.path.isfile(file_path):
            extension = os.path.splitext(file_name)[1].lower()
            with open(file_path, 'r', encoding='latin-1') as f:
                files.append({
                    'name': file_name,
                    'path': file_path,
                    'extension': extension,
                    'content': f.read(),
                })
            print(f"Chargé : {files[i]['name']} ({len(files[i]['content'])} caractères) {files[i]['extension']})")
    return files

# Convert PDF to TXT
def pdf_to_txt(file, output_folder_path):
    """
    Convert a PDF file (already loaded as a dict from file_loader) to TXT and save it in output_folder.
    Returns the path to the TXT file.
    """
    txt_name = os.path.splitext(file['name'])[0] + '.txt'
    txt_path = os.path.join(output_folder_path, txt_name)
    if file['extension'] == '.pdf':
        if not os.path.isfile(txt_path):
            print(f"Conversion du PDF {file['name']} en TXT...")
            with open(file['path'], 'rb') as f:
                reader = PdfReader(f)
                text = ""
                for page in reader.pages:
                    text += page.extract_text() or ""
            with open(txt_path, 'w', encoding='utf-8') as f_txt:
                f_txt.write(text)
            print(f"Fichier TXT créé : {os.path.splitext(file['name'])[0]}.txt")
        else:
            print(f"Le fichier TXT {txt_name} existe déjà, pas de conversion nécessaire.")
        return txt_path
    print(f"Le fichier {file['name']} n'est pas un PDF, pas de conversion effectuée.")
    return None

sources_path = os.path.join(os.getcwd(), "sources")
transformed_sources_path = os.path.join(os.getcwd(), "transformed_sources")
files = file_loader(sources_path)
for file in files:
    if(file['extension'] == '.pdf'):
        pdf_to_txt(file, transformed_sources_path)

Chargé : Warhammer 4 - Livre de base.pdf (67641992 caractères) .pdf)
Conversion du PDF Warhammer 4 - Livre de base.pdf en TXT...
Fichier TXT créé : Warhammer 4 - Livre de base.txt


# Découpage des fichiers txt en chunks

In [None]:
import os
from langchain.text_splitter import TokenTextSplitter
import tiktoken
import json

# Paramètres des chunks
SOURCES_DIR = "transformed_sources"                             # Dossier contenant les fichiers texte à découper
CHUNK_SIZE = 300                                                # Nombre de tokens par chunk
CHUNK_OVERLAP = 30                                              # Nombre de tokens de chevauchement
OUTPUT_DIR = f"chunks/CS_{CHUNK_SIZE}_CO_{CHUNK_OVERLAP}"       # Dossier de sortie pour les chunks

encoding = tiktoken.get_encoding("cl100k_base")     # Utiliser l'encodeur de tokens de OpenAI (compatible Mistral)

# Initialiser le découpeur de texte
splitter = TokenTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    encoding_name="cl100k_base",
)

def read_json(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def write_json(path, config):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(config, f, indent=4, ensure_ascii=False)

def chunk_files(new_files, old_files):
    i = 0
    for i, new_file in enumerate(new_files, start=1):
        print(f"Découpage de {new_file} en chunk de {CHUNK_SIZE} tokens...")
        chunk_file(os.path.join(SOURCES_DIR, new_file))

    print(f"{i} fichiers ont été découpés en chunks (dans '{OUTPUT_DIR}/').")
    new_param = {
        "CHUNK_SIZE": CHUNK_SIZE,
        "CHUNK_OVERLAP": CHUNK_OVERLAP,
        "SOURCE_DIR": SOURCES_DIR,
        "files": old_files + new_files,
    }
    write_json(os.path.join(OUTPUT_DIR, "param.json"), new_param)


def chunk_file(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        text = f.read()
    chunks = splitter.split_text(text)

    filename = os.path.splitext(os.path.basename(filepath))[0]
    for i, chunk in enumerate(chunks):
        with open(os.path.join(OUTPUT_DIR, f"{filename}_chunk_{i}.txt"), "w", encoding="utf-8") as out:
            out.write(chunk)

def chunk_cleaner(files):
    param = read_json(os.path.join(OUTPUT_DIR, "param.json"))

    p_chunk_size = param.get("CHUNK_SIZE")
    p_chunk_overlap = param.get("CHUNK_OVERLAP")
    p_sources_dir = param.get("SOURCE_DIR")
    p_files = param.get("files")

    if p_chunk_size == CHUNK_SIZE and p_chunk_overlap == CHUNK_OVERLAP and p_sources_dir == SOURCES_DIR:
        removed_files = list(set(p_files) - set(files))
        new_files = list(set(files) - set(p_files))
        if removed_files:
            print(f"Les fichiers suivants ne sont plus présents : {removed_files}")
            nb_cleaned = chunk_eraser(removed_files)
            print(f"{nb_cleaned} fichiers de chunks ont été supprimés (dans '{OUTPUT_DIR}'/).")
            param["files"] = [f for f in p_files if f not in removed_files]
            write_json(os.path.join(OUTPUT_DIR, "param.json"), param)
        if not new_files:
            print(f"Aucun nouveaux fichier n'a été détecté (dans '{SOURCES_DIR}/').")
        else:
            print(f"{len(new_files)} nouveaux fichiers ont été détectés : {new_files} (dans '{SOURCES_DIR}/').")
        return new_files, param["files"]

def chunk_eraser(diff):
    nb_cleaned = 0
    for file in diff:
        prefix = os.path.splitext(file)[0]
        for f in os.listdir(OUTPUT_DIR):
            if f.startswith(prefix) and f.endswith(".txt"):
                file_path = os.path.join(OUTPUT_DIR, f)
                if os.path.isfile(file_path):
                    os.remove(file_path)
                    nb_cleaned += 1
    return nb_cleaned

files = [f for f in os.listdir(SOURCES_DIR) if f.endswith(".txt")]
old_files = []
if os.path.isfile(os.path.join(OUTPUT_DIR, "param.json")):
    files, old_files = chunk_cleaner(files)
else:
    os.makedirs(OUTPUT_DIR)
chunk_files(files, old_files)







Les fichiers suivants ne sont plus présents : ['truc.txt', 'chose.txt']
2 fichiers de chunks ont été supprimés (dans 'chunks/CS_500_CO_50'/).
Aucun nouveaux fichier n'a été détecté (dans 'transformed_sources/').
0 fichiers ont été découpés en chunks (dans 'chunks/CS_500_CO_50/').


# Création de la base de données vectorielle


In [2]:
import os
import uuid
from sentence_transformers import SentenceTransformer
import chromadb

CHUNKS_DIR = "chunks/CS_300_CO_30"  # Dossier contenant les chunks
VECTOR_DB_DIR = "vector_db"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"

# Init Chroma
client = chromadb.PersistentClient(path="vector_db")
collection = client.get_or_create_collection("warhammer_collection")

# Embedding model
model = SentenceTransformer(EMBEDDING_MODEL)

# Lecture des chunks
def load_chunks():
    chunks = []
    metadatas = []
    ids = []
    for file in os.listdir(CHUNKS_DIR):
        if file.endswith(".txt"):
            path = os.path.join(CHUNKS_DIR, file)
            with open(path, "r", encoding="utf-8") as f:
                content = f.read()
                chunks.append(content)
                metadatas.append({"filename": file})
                ids.append(str(uuid.uuid4()))  # identifiant unique
    return chunks, metadatas, ids

# Génération + insertion
def embed_and_store():
    print("Lecture des chunks...")
    texts, metadatas, ids = load_chunks()

    print("Génération des embeddings...")
    embeddings = model.encode(texts, show_progress_bar=True).tolist()

    print("Insertion dans la base vectorielle...")
    collection.add(
        documents=texts,
        embeddings=embeddings,
        metadatas=metadatas,
        ids=ids
    )

    print(f"{len(texts)} chunks ajoutés à la base vectorielle dans '{VECTOR_DB_DIR}'.")

embed_and_store()


Lecture des chunks...
Génération des embeddings...


Batches: 100%|██████████| 48/48 [00:49<00:00,  1.03s/it]


Insertion dans la base vectorielle...
1506 chunks ajoutés à la base vectorielle dans 'vector_db'.


# Test du modèle

In [None]:
import chromadb
from sentence_transformers import SentenceTransformer

# Configuration
VECTOR_DB_DIR = "vector_db"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
TOP_K = 5  # nombre de résultats à retourner

# Initialisation du client et du modèle
client = chromadb.PersistentClient(path=VECTOR_DB_DIR)
collection = client.get_collection("warhammer_collection")
model = SentenceTransformer(EMBEDDING_MODEL)

def ask_question():
    # question = input("Pose ta question : ").strip()
    # if not question:
    #     print("Tu dois poser une question.")
    #     return
    question = "Quel est la capitale de l'Empire ?"

    print("Recherche des passages les plus pertinents...\n")

    # Embedding de la question
    query_embedding = model.encode([question])[0].tolist()

    # Recherche vectorielle
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=TOP_K,
        include=["documents", "metadatas", "distances"]
    )

    # Affichage des résultats       
    for i, (doc, meta, dist) in enumerate(zip(results["documents"][0], results["metadatas"][0], results["distances"][0])):
        print(f"\n--- Résultat {i+1} ({meta['filename']}) - Similarité : {round((1 - dist) * 100, 2)}% ---")
        print(doc.strip()[:1000])
        
ask_question()


Recherche des passages les plus pertinents...


--- Résultat 1 (Warhammer 4 - Livre de base_chunk_139.txt) - Similarité : 11.17% ---
• Pourchasser le tueur d'un membre décédé du groupe.
• Impressionner votre commanditaire en réussissant
complètement une tâche qui vous a été confiée.
Ambitions à L ong Terme du Groupe 
Tout comme les Ambitions de groupe à court terme, 
les Ambitions  de groupe à long terme fonctionnent comme 
vos Ambitions personnelles, et peuvent avoir une portée tout 
aussi large, mais s'adressent à votre groupe tout entier.
Voici des exemples d'ambitions à long terme pour le Groupe :
• Éradiquer un culte du chaos à l'échelle de l'Empire.
• Construisez un château.
• Devenez des Héros de l'Empire, chacun d'eux recevant
une Croix Impériale pour bravoure, épinglée sur vos
poitrines par l'Empereur lui-même !
Réaliser les  Ambitions de  Votre Groupe
Si votre groupe atteint son Ambition à court terme, 
chaque membre reçoit +50 XP, et vous pouvez tous choisir une 
nouvelle Am