<span style="color:#8B949E;">
<b>Note de lecture</b> ‚Äî Notebook issu de tests it√©ratifs (‚Äúspeed-tests‚Äù).  
Le corpus utilis√© ici est un <b>dataset de substitution</b> (non align√© client) uniquement pour valider la m√©canique (retrieval + √©valuation) et d√©rouler la roadmap.  
Pour le chemin complet, suivre l‚Äôordre 01 ‚Üí 10 et lire en priorit√© les sections Markdown.
</span>


# üß± STAGE 8 ‚Äî Extraction + chunking ‚Äúpassage-level‚Äù (XML juridiques)

Ce stage introduit la **brique manquante** observ√©e dans les stages pr√©c√©dents (BM25 / dense / hybride) :  
‚û°Ô∏è **la granularit√© documentaire**.

Jusqu‚Äôici, on benchmarkait des retrievers sur un corpus o√π la structure ‚ÄúXML concat√©n√©‚Äù pouvait :
- injecter beaucoup de **bruit** (liens, historiques, m√©tadonn√©es, en-t√™tes techniques),
- diluer le **signal juridique**,
- et faire √©chouer des requ√™tes pourtant simples (top-k rempli de non-pertinents).

Ce script construit un corpus **chunk√© au niveau ‚Äúpassage‚Äù** (paragraphes regroup√©s + d√©coupe phrase-aware), pr√™t √† √™tre index√© et benchmark√© √† l‚Äôidentique dans les stages suivants.

---

SCRIPT 8 ‚Äî EXTRACTION + CHUNKING "PASSAGE-LEVEL" POUR XML JURIDIQUES

But du script
-------------
Ce script transforme un corpus de documents XML juridiques (structure type L√©gifrance,
ou toute structure proche) en une liste de "chunks" (passages) propres, pr√™ts √† √™tre index√©s
(BM25, embeddings, hybride) et √©valu√©s via les scripts de benchmark existants.

Pourquoi (probl√®me observ√©)
---------------------------
Dans les XML juridiques, une grande partie du contenu peut √™tre compos√©e de :
- liens (r√©f√©rences, historiques, citations) : balises LIEN / ... et structures associ√©es
- m√©tadonn√©es (dates, identifiants, hi√©rarchie, titres multiples)
- en-t√™tes techniques concat√©n√©s (ID, URL, statut, etc.) sur certains exports

Si on concat√®ne "tout le texte du XML" et qu'on chunk en "1 document = 1 chunk",
on indexe beaucoup de bruit et on dilue le signal juridique.
Le passage-level am√©liore la pr√©cision du retrieval (question ‚Üî passage pertinent)
et facilite la citation/sour√ßage.

Ce script applique donc des principes simples :
1) Extraire prioritairement le "texte utile" (paragraphes/blocs textuels) en √©vitant les zones bruyantes.
2) Chunker au niveau passage (regroupement de paragraphes) avec d√©coupe "phrase-aware" si n√©cessaire,
   pour √©viter de couper au milieu d'une phrase.
3) Ajouter une tra√ßabilit√© permettant de reconstituer l'ordre des passages d'un m√™me document.

Comment (r√©sum√©)
----------------
- Parsing XML via xml.etree.ElementTree
- Suppression des namespaces (robustesse des find/findall sur des XML h√©t√©rog√®nes)
- Extraction de m√©tadonn√©es minimales (id, num, url, dates, titre si trouv√©)
- Extraction de paragraphes :
    * priorit√© √† BLOC_TEXTUEL/CONTENU//p (sinon strat√©gies de fallback contr√¥l√©es)
- Chunking passage-level :
    * regroupement de paragraphes
    * d√©coupe "phrase-aware" (ponctuation) si un paragraphe est trop long
- Nettoyage l√©ger :
    * suppression d'en-t√™tes techniques en pr√©fixe (ID/URL/stats concat√©n√©s) sur certains documents
- Sauvegarde au format JSONL (1 chunk par ligne)

Sorties attendues
-----------------
Chaque chunk est un dict contenant :
- doc_id        : chemin du fichier XML source
- doc_type      : type simple d√©duit du chemin (article / section_ta / other) pour filtrer rapidement
- chunk_index   : index (0..n) pour reconstituer l'ordre et r√©cup√©rer des chunks voisins (i-1, i, i+1)
- chunk_id      : identifiant stable (hash) calcul√© sur le texte r√©ellement index√©
- text          : texte du passage (nettoy√©, faible bruit)
- meta          : m√©tadonn√©es (id article, num, url, dates, titre...)
- links_count   : nombre de liens extraits (m√©tadonn√©es)
- links_sample  : √©chantillon de liens pour audit (m√©tadonn√©es), non index√©

Compatibilit√©
-------------
Pour une compatibilit√© directe avec les scripts existants :
- les champs doc_id et text sont pr√©sents (chargement facile c√¥t√© BM25/dense)
- meta / liens / doc_type peuvent √™tre ignor√©s dans un POC, mais utiles pour filtrer et sourcer.

## ‚öôÔ∏è Code (identique au script, adapt√© notebook)
- Le code ci-dessous est celui du script 08.
- Seule adaptation notebook : le bloc `if __name__ == "__main__"` est retir√© pour √©viter de stopper le kernel.


In [1]:
from __future__ import annotations

import argparse
import hashlib
import json
import os
import re
import sys
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional


# ----------------------------
# Nettoyage texte (petit, robuste)
# ----------------------------

_WHITESPACE_RE = re.compile(r"\s+", flags=re.UNICODE)

# ----------------------------
# Nettoyage l√©ger des "en-t√™tes" techniques parfois pr√©sents au d√©but des articles
# ----------------------------

# 1) Pr√©fixes techniques fr√©quents dans certains exports : ID + code opaque + "LEGI"
_RE_PREFIX_ID = re.compile(
    r"^\s*(LEGI(?:ARTI|SCTA|TEXT)\d{6,})\s+[A-Z0-9X]{10,}\s+LEGI\s+",
    flags=re.IGNORECASE,
)

# 2) Pr√©fixe contenant un chemin relatif vers le XML (ex: article/LEGI/ARTI/...xml)
_RE_PREFIX_PATH = re.compile(
    r"^\s*(?:article|section_ta|texte|text)/LEGI/(?:ARTI|SCTA|TEXT)/[^ \n\r\t]+?\.xml\s+",
    flags=re.IGNORECASE,
)

# 3) Bloc "m√©tadonn√©es" souvent concat√©n√© : "Article Lxxx-xx MODIFIE 2000-.. 2003-.. AUTONOME ..."
_RE_PREFIX_ARTICLE_META = re.compile(
    r"^\s*Article\s+[A-Z]?\s*\d[\w\-\.]*\s+(?:VIGUEUR|MODIFIE|ABROGE|ABROG√â|PERIME|P√âRIM√â)?\s*"
    r"(?:\d{4}-\d{2}-\d{2}\s+)?(?:\d{4}-\d{2}-\d{2}\s+)?"
    r"(?:AUTONOME\s+)?",
    flags=re.IGNORECASE,
)


def normalize_text(text: str) -> str:
    """Normalise un texte : trim + espaces multiples."""
    return _WHITESPACE_RE.sub(" ", text.strip())


def strip_technical_header(text: str) -> str:
    """
    Retire les en-t√™tes techniques (ID/chemin/meta concat√©n√©s) observ√©s sur certains documents.

    Le nettoyage est volontairement conservateur :
    - il ne s'applique que sur le d√©but du texte
    - il fait au plus quelques passes
    """
    t = normalize_text(text)

    # Passe 1 : ID + code opaque + LEGI
    t = _RE_PREFIX_ID.sub("", t)

    # Passe 2 : chemin relatif XML
    t = _RE_PREFIX_PATH.sub("", t)

    # Passe 3 : mini-bloc "Article ... MODIFIE ... AUTONOME ..."
    t = _RE_PREFIX_ARTICLE_META.sub("", t)

    # Nettoyage final
    return normalize_text(t)


def safe_findtext(root: ET.Element, path: str) -> Optional[str]:
    """Retourne root.find(path).text normalis√© si pr√©sent, sinon None."""
    node = root.find(path)
    if node is not None and node.text:
        t = normalize_text(node.text)
        return t if t else None
    return None


def iter_text_nodes(elem: ET.Element) -> Iterable[str]:
    """
    It√®re sur les textes 'utiles' d'un √©l√©ment XML :
    - elem.text et elem.tail, en normalisant.
    """
    if elem.text:
        t = normalize_text(elem.text)
        if t:
            yield t
    if elem.tail:
        t = normalize_text(elem.tail)
        if t:
            yield t


# ----------------------------
# Robustesse XML : suppression des namespaces
# ----------------------------

def strip_namespaces(root: ET.Element) -> None:
    """
    Supprime les namespaces des tags XML pour permettre des findall(".//TAG") simples.

    Exemple:
      {http://...}BLOC_TEXTUEL -> BLOC_TEXTUEL
    """
    for elem in root.iter():
        if isinstance(elem.tag, str) and "}" in elem.tag:
            elem.tag = elem.tag.rsplit("}", 1)[-1]


# ----------------------------
# Typage simple des documents (utile pour filtrer)
# ----------------------------

def infer_doc_type_from_path(xml_path: str) -> str:
    """
    D√©duit un type de document √† partir du chemin.

    Sert √† filtrer rapidement (ex: exclure section_ta si trop bruit√©).
    """
    p = xml_path.replace("/", "\\").lower()
    if "\\article\\" in p or "\\arti\\" in p:
        return "article"
    if "\\section_ta\\" in p or "\\scta\\" in p:
        return "section_ta"
    return "other"


# ----------------------------
# D√©coupe phrase-aware (ponctuation)
# ----------------------------

_ABBREV_TOKENS = {
    # Abr√©viations fr√©quentes en juridique / admin
    "art", "al", "alin", "n", "no", "n¬∞", "m", "mme", "dr", "pr", "st", "ste",
    # Marques d'articles/sections (√©vite de couper apr√®s "L." "R." etc.)
    "l", "r", "d", "c",
}


def _looks_like_abbrev_before_dot(text: str, dot_index: int) -> bool:
    """
    D√©tecte si un '.' correspond probablement √† une abr√©viation (ex: 'L.' / 'art.').
    """
    left = text[: dot_index + 1].rstrip()
    m = re.search(r"([A-Za-z√Ä-√ø]{1,6})\.$", left)
    if not m:
        return False

    token = m.group(1).lower()

    # Lettre unique : L. R. D. etc.
    if len(token) == 1 and token.isalpha():
        return True

    return token in _ABBREV_TOKENS


def split_into_sentences(text: str) -> List[str]:
    """
    D√©coupe un texte en "phrases" via . ! ? avec une heuristique l√©g√®re.

    Objectif : √©viter de couper au milieu d'une phrase lors du chunking.
    """
    text = normalize_text(text)
    if not text:
        return []

    sentences: List[str] = []
    start = 0
    n = len(text)

    for i, ch in enumerate(text):
        if ch not in ".!?":
            continue

        # On ne coupe pas sur abr√©viations
        if ch == "." and _looks_like_abbrev_before_dot(text, i):
            continue

        # Fin de texte
        if i == n - 1:
            tail = text[start:].strip()
            if tail:
                sentences.append(tail)
            return sentences

        # On coupe si derri√®re on a (√©ventuellement) guillemets/parenth√®ses puis espace
        j = i + 1
        while j < n and text[j] in ['"', "¬ª", "‚Äô", "'", ")", "]"]:
            j += 1

        if j < n and text[j].isspace():
            k = j
            while k < n and text[k].isspace():
                k += 1

            # D√©but plausible de phrase
            if k < n and re.match(r"[A-Z√Ä-√ñ√ò-√ù0-9¬´(\[]", text[k]):
                sent = text[start:k].strip()
                if sent:
                    sentences.append(sent)
                start = k

    tail = text[start:].strip()
    if tail:
        sentences.append(tail)
    return sentences


def split_long_text_phrase_aware(text: str, max_chars: int, overlap_sentences: int = 1) -> List[str]:
    """
    D√©coupe un texte long en segments <= max_chars en privil√©giant les fins de phrase.

    overlap_sentences conserve un l√©ger chevauchement de phrases pour pr√©server le contexte.
    """
    text = normalize_text(text)
    if len(text) <= max_chars:
        return [text]

    sents = split_into_sentences(text)

    # Fallback si segmentation en phrases impossible : d√©coupe en caract√®res avec overlap
    if len(sents) <= 1:
        parts: List[str] = []
        overlap_chars = 120
        start = 0
        while start < len(text):
            end = min(len(text), start + max_chars)
            parts.append(text[start:end].strip())
            if end >= len(text):
                break
            start = max(0, end - overlap_chars)
            if start >= end:
                start = end
        return [p for p in parts if len(p) >= 80]

    segments: List[str] = []
    cur: List[str] = []
    cur_len = 0

    for s in sents:
        s = normalize_text(s)
        if not s:
            continue

        # Cas rare : une phrase seule d√©passe max_chars -> fallback caract√®re
        if len(s) > max_chars:
            if cur:
                segments.append(normalize_text(" ".join(cur)))
                cur, cur_len = [], 0
            segments.extend(split_long_text_phrase_aware(s, max_chars=max_chars, overlap_sentences=0))
            continue

        add_len = len(s) + (1 if cur_len else 0)
        if cur and (cur_len + add_len > max_chars):
            segments.append(normalize_text(" ".join(cur)))

            if overlap_sentences > 0:
                cur = cur[-overlap_sentences:]
                cur_len = sum(len(x) for x in cur) + max(0, len(cur) - 1)
            else:
                cur, cur_len = [], 0

        cur.append(s)
        cur_len += add_len

    if cur:
        segments.append(normalize_text(" ".join(cur)))

    return [seg for seg in segments if len(seg) >= 80]


# ----------------------------
# M√©tadonn√©es & liens
# ----------------------------

@dataclass(frozen=True)
class ArticleMeta:
    """M√©tadonn√©es minimales d'un article/document XML."""
    article_id: Optional[str]
    num: Optional[str]
    url: Optional[str]
    etat: Optional[str]
    date_debut: Optional[str]
    date_fin: Optional[str]
    titre: Optional[str]


def extract_article_meta(root: ET.Element) -> ArticleMeta:
    """
    Extrait des m√©tadonn√©es fr√©quentes dans les XML juridiques.

    Notes :
    - Les chemins sont bas√©s sur des structures typiques ; extraction tol√©rante
      (beaucoup de fichiers ne contiennent pas tous les champs).
    """
    article_id = (
        safe_findtext(root, ".//META/META_COMMUN/ID")
        or safe_findtext(root, ".//ID")
    )
    num = safe_findtext(root, ".//META/META_SPEC/META_ARTICLE/NUM") or safe_findtext(root, ".//NUM")
    url = safe_findtext(root, ".//META/META_COMMUN/URL") or safe_findtext(root, ".//URL")
    etat = safe_findtext(root, ".//META/META_SPEC/META_ARTICLE/ETAT") or safe_findtext(root, ".//ETAT")
    date_debut = safe_findtext(root, ".//META/META_SPEC/META_ARTICLE/DATE_DEBUT") or safe_findtext(root, ".//DATE_DEBUT")
    date_fin = safe_findtext(root, ".//META/META_SPEC/META_ARTICLE/DATE_FIN") or safe_findtext(root, ".//DATE_FIN")

    # Tentative de titre (selon les structures disponibles)
    titre = (
        safe_findtext(root, ".//CONTEXTE//TITRE_TXT")
        or safe_findtext(root, ".//TITRE_TXT")
        or safe_findtext(root, ".//CONTEXTE//TITRE_TM")
        or safe_findtext(root, ".//TITRE_TM")
    )

    return ArticleMeta(
        article_id=article_id,
        num=num,
        url=url,
        etat=etat,
        date_debut=date_debut,
        date_fin=date_fin,
        titre=titre,
    )


def extract_links(root: ET.Element, max_links: int = 200) -> List[Dict[str, str]]:
    """
    Extrait les liens juridiques (citations, versions, renvois) en m√©tadonn√©es.

    On les conserve hors du texte index√© :
    - utiles pour naviguer / expliquer / auditer
    - mais bruit tr√®s fort pour le retrieval

    max_links : limite de s√©curit√© pour √©viter d'embarquer des milliers de liens.
    """
    links: List[Dict[str, str]] = []

    for lien in root.findall(".//LIEN"):
        payload: Dict[str, str] = {}
        payload.update({k: v for k, v in lien.attrib.items() if v})
        if lien.text:
            payload["label"] = normalize_text(lien.text)
        if payload:
            links.append(payload)
        if len(links) >= max_links:
            break

    return links


# ----------------------------
# Extraction paragraphes
# ----------------------------

def _walk_collect_paragraphs(root: ET.Element, excluded_tags: Optional[set] = None) -> List[str]:
    """
    Collecte r√©cursivement le texte des balises <p>, en excluant certaines sous-arborescences.

    ElementTree "standard" ne fournit pas directement l'acc√®s au parent d'un n≈ìud.
    Pour √©viter de r√©cup√©rer des paragraphes situ√©s dans des zones de liens ou d'historique,
    on parcourt r√©cursivement l'arbre en maintenant un √©tat "dans une zone exclue".

    excluded_tags : ensemble de noms de balises dont toute la sous-arborescence est ignor√©e
                    (ex: LIENS, VERSIONS, META, CONTEXTE).
    """
    if excluded_tags is None:
        excluded_tags = {"LIENS", "VERSIONS"}

    paragraphs: List[str] = []

    def _dfs(node: ET.Element, in_excluded: bool) -> None:
        now_excluded = in_excluded or (node.tag in excluded_tags)

        if (not now_excluded) and node.tag == "p":
            txt = " ".join(iter_text_nodes(node))
            txt = normalize_text(txt)
            if txt:
                paragraphs.append(txt)

        for child in list(node):
            _dfs(child, now_excluded)

    _dfs(root, False)
    return paragraphs


def extract_paragraphs(root: ET.Element) -> List[str]:
    """
    Extrait une liste de paragraphes "utiles" en ignorant les zones bruyantes.

    Strat√©gie (du plus fiable au plus permissif) :
    1) Priorit√© √† BLOC_TEXTUEL/CONTENU//p.
    2) Autres zones de contenu (ex: NOTA/CONTENU//p) si pr√©sentes.
    3) Parcours r√©cursif : tous les <p> en excluant LIENS/VERSIONS/META/CONTEXTE.
    4) Fallback ultime : texte global (sans LIEN).
    """
    paragraphs: List[str] = []

    # 1) Zone textuelle "canonique"
    for p in root.findall(".//BLOC_TEXTUEL//CONTENU//p"):
        txt = " ".join(iter_text_nodes(p))
        txt = normalize_text(txt)
        if txt:
            paragraphs.append(txt)

    if paragraphs:
        return paragraphs

    # 2) Contenu parfois pr√©sent dans NOTA/CONTENU
    for p in root.findall(".//NOTA//CONTENU//p"):
        txt = " ".join(iter_text_nodes(p))
        txt = normalize_text(txt)
        if txt:
            paragraphs.append(txt)

    if paragraphs:
        return paragraphs

    # 3) Parcours global en excluant les zones tr√®s bruyantes
    paragraphs = _walk_collect_paragraphs(root, excluded_tags={"LIENS", "VERSIONS", "META", "CONTEXTE"})
    if paragraphs:
        return paragraphs

    # 4) Fallback ultime : texte global en √©vitant la balise LIEN
    texts: List[str] = []
    for elem in root.iter():
        if elem.tag == "LIEN":
            continue
        if elem.text:
            t = normalize_text(elem.text)
            if t:
                texts.append(t)

    blob = normalize_text(" ".join(texts))
    return [blob] if blob else []


# ----------------------------
# Chunking passage-level
# ----------------------------

def chunk_paragraphs(
    paragraphs: List[str],
    max_chars: int = 1200,
    overlap_paragraphs: int = 1,
) -> List[str]:
    """
    Regroupe des paragraphes en chunks "passage-level".

    - On conserve au maximum des fronti√®res naturelles (paragraphes).
    - Si un paragraphe d√©passe max_chars, on le d√©coupe en segments "phrase-aware"
      pour √©viter de couper au milieu d'une phrase.
    - Overlap l√©ger entre chunks pour pr√©server le contexte.
    """
    if max_chars < 200:
        raise ValueError("max_chars doit √™tre >= 200 pour rester utile.")

    flat_units: List[str] = []
    for para in paragraphs:
        para = normalize_text(para)
        if not para:
            continue

        if len(para) <= max_chars:
            flat_units.append(para)
        else:
            flat_units.extend(split_long_text_phrase_aware(para, max_chars=max_chars, overlap_sentences=1))

    chunks: List[str] = []
    cur: List[str] = []
    cur_len = 0

    for unit in flat_units:
        unit = normalize_text(unit)
        if not unit:
            continue

        add_len = len(unit) + (1 if cur_len else 0)
        if cur and (cur_len + add_len > max_chars):
            chunks.append(normalize_text(" ".join(cur)))

            if overlap_paragraphs > 0:
                cur = cur[-overlap_paragraphs:]
                cur_len = sum(len(x) for x in cur) + max(0, len(cur) - 1)
            else:
                cur, cur_len = [], 0

        cur.append(unit)
        cur_len += add_len

    if cur:
        chunks.append(normalize_text(" ".join(cur)))

    return [c for c in chunks if len(c) >= 80]





def stable_chunk_id(doc_id: str, chunk_index: int, text: str) -> str:
    """
    G√©n√®re un identifiant stable et compact pour un chunk.
    Le hash d√©pend :
    - doc_id (chemin)
    - index du chunk
    - texte du chunk
    """
    h = hashlib.sha1()
    h.update(doc_id.encode("utf-8", errors="ignore"))
    h.update(f"::{chunk_index}::".encode("utf-8"))
    h.update(text.encode("utf-8", errors="ignore"))
    return h.hexdigest()[:16]


# ----------------------------
# Parcours corpus
# ----------------------------

def iter_xml_files(data_root: str) -> Iterable[str]:
    """It√®re r√©cursivement sur tous les .xml d'un r√©pertoire."""
    for root_dir, _, files in os.walk(data_root):
        for fn in files:
            if fn.lower().endswith(".xml"):
                yield os.path.join(root_dir, fn)


def build_chunk_corpus_for_file(
    xml_path: str,
    max_chars: int = 1200,
    overlap_paragraphs: int = 1,
    keep_links: bool = True,
    links_sample_size: int = 20,
) -> List[Dict]:
    """
    Parse un fichier XML et retourne une liste de chunks indexables.

    Chaque chunk inclut :
    - doc_id / doc_type / chunk_index / chunk_id
    - text (nettoy√©)
    - meta (m√©tadonn√©es minimales)
    - links_count + links_sample (audit, non index√©)
    """
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()

        # Rend la recherche de balises robuste m√™me si le XML utilise des namespaces
        strip_namespaces(root)

        meta = extract_article_meta(root)
        paragraphs = extract_paragraphs(root)
        doc_type = infer_doc_type_from_path(xml_path)

    except Exception:
        return []

    if not paragraphs:
        return []

    chunks_text = chunk_paragraphs(
        paragraphs=paragraphs,
        max_chars=max_chars,
        overlap_paragraphs=overlap_paragraphs,
    )
    if not chunks_text:
        return []

    links_all = extract_links(root) if keep_links else []
    links_count = len(links_all)
    links_sample = links_all[:max(0, links_sample_size)] if keep_links else []

    out: List[Dict] = []
    for i, chunk_text in enumerate(chunks_text):
        # Nettoyage conservateur : supprime certains en-t√™tes techniques observ√©s
        cleaned = strip_technical_header(chunk_text)

        # S√©curit√© : si le nettoyage vide trop le texte, on garde l'original
        if not cleaned or len(cleaned) < 50:
            cleaned = chunk_text

        out.append({
            "doc_id": xml_path,
            "doc_type": doc_type,
            "chunk_index": i,

            # L'identifiant refl√®te exactement le texte r√©ellement index√©
            "chunk_id": stable_chunk_id(xml_path, i, cleaned),

            # Texte r√©ellement index√©
            "text": cleaned,

            "meta": {
                "article_id": meta.article_id,
                "num": meta.num,
                "url": meta.url,
                "etat": meta.etat,
                "date_debut": meta.date_debut,
                "date_fin": meta.date_fin,
                "titre": meta.titre,
            },
            "links_count": links_count,
            "links_sample": links_sample,  # √©chantillon pour audit (non index√©)
        })

    return out



def build_chunk_corpus(
    data_root: str,
    max_chars: int = 1200,
    overlap_paragraphs: int = 1,
    min_text_len: int = 200,
    keep_links: bool = True,
    links_sample_size: int = 20,
    limit_files: Optional[int] = None,
) -> List[Dict]:
    """
    Construit un corpus chunk√© √† partir d'un r√©pertoire XML.

    min_text_len : filtre "anti-documents pauvres" (appliqu√© sur la concat des chunks).
    limit_files  : utile pour un test rapide.
    """
    corpus: List[Dict] = []
    seen_files = 0

    for xml_path in iter_xml_files(data_root):
        chunks = build_chunk_corpus_for_file(
            xml_path=xml_path,
            max_chars=max_chars,
            overlap_paragraphs=overlap_paragraphs,
            keep_links=keep_links,
            links_sample_size=links_sample_size,
        )

        if chunks:
            total_len = sum(len(c["text"]) for c in chunks)
            if total_len >= min_text_len:
                corpus.extend(chunks)

        seen_files += 1
        if limit_files is not None and seen_files >= limit_files:
            break

    return corpus


def save_jsonl(records: List[Dict], out_path: str) -> None:
    """Sauvegarde une liste de dicts au format JSONL (1 dict par ligne)."""
    with open(out_path, "w", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")


def print_quick_stats(records: List[Dict]) -> None:
    """Affiche quelques stats rapides pour v√©rifier l'effet du chunking."""
    if not records:
        print("Aucun chunk g√©n√©r√©.")
        return

    texts = [r["text"] for r in records]
    lengths = [len(t) for t in texts]
    n_chunks = len(records)
    n_docs = len({r["doc_id"] for r in records})

    avg_len = sum(lengths) / n_chunks
    min_len = min(lengths)
    max_len = max(lengths)

    # Information de contexte : volume de liens (m√©tadonn√©es) associ√© aux documents
    links_counts = [int(r.get("links_count", 0)) for r in records]
    avg_links = sum(links_counts) / n_chunks

    print(f"Documents sources : {n_docs}")
    print(f"Chunks g√©n√©r√©s     : {n_chunks}")
    print(f"Taille chunk (chars) ‚Äî min/avg/max : {min_len} / {avg_len:.1f} / {max_len}")
    print(f"Liens (m√©tadonn√©es) ‚Äî moyenne par chunk : {avg_links:.1f}")


# ----------------------------
# CLI + mode "IDE (Spyder)"
# ----------------------------

def default_run_config() -> Dict[str, object]:
    """
    D√©finit la configuration par d√©faut utilis√©e quand le script est lanc√© sans arguments
    (ex: bouton "Run" de Spyder).

    Remplace simplement les chemins et param√®tres ci-dessous selon notre machine.
    """
    return {
        # Chemin racine de notre corpus XML
        "data_root": r"D:\-- Projet RAG Avocats --\data_main\data",

        # Fichier de sortie JSONL
        "out_jsonl": r"D:\-- Projet RAG Avocats --\data_main\result_tests\corpus_chunks.jsonl",

        # Param√®tres de chunking
        "max_chars": 1200,
        "overlap_paragraphs": 1,

        # Filtre anti-documents pauvres (somme des longueurs des chunks par doc)
        "min_text_len": 200,

        # Liens en m√©tadonn√©es (audit) : True garde, False ignore
        "keep_links": True,
        "links_sample_size": 20,

        # Optionnel : limiter le nombre de fichiers pour un test rapide
        "limit_files": None,  # ex: 200
    }


def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
    """Parse les arguments CLI."""
    p = argparse.ArgumentParser(description="Chunker un corpus XML juridique en passages propres (JSONL).")
    p.add_argument("--data-root", required=True, help="R√©pertoire racine contenant les XML.")
    p.add_argument("--out-jsonl", required=True, help="Chemin de sortie JSONL.")
    p.add_argument("--max-chars", type=int, default=1200, help="Taille max (caract√®res) d'un chunk.")
    p.add_argument("--overlap-paragraphs", type=int, default=1, help="Nombre de paragraphes de chevauchement.")
    p.add_argument("--min-text-len", type=int, default=200, help="Filtre : longueur minimale totale de texte par document.")
    p.add_argument("--no-links", action="store_true", help="Ne pas extraire les liens en m√©tadonn√©es.")
    p.add_argument("--links-sample-size", type=int, default=20, help="Taille de l'√©chantillon de liens conserv√© par chunk (audit).")
    p.add_argument("--limit-files", type=int, default=None, help="Limiter le nombre de fichiers (test rapide).")
    return p.parse_args(argv)


def run_with_config(cfg: Dict[str, object]) -> int:
    """
    Ex√©cute la construction du corpus chunk√© √† partir d'un dictionnaire de configuration.

    Ce wrapper √©vite de dupliquer la logique entre le mode CLI et le mode IDE.
    """
    records = build_chunk_corpus(
        data_root=str(cfg["data_root"]),
        max_chars=int(cfg["max_chars"]),
        overlap_paragraphs=int(cfg["overlap_paragraphs"]),
        min_text_len=int(cfg["min_text_len"]),
        keep_links=bool(cfg["keep_links"]),
        links_sample_size=int(cfg["links_sample_size"]),
        limit_files=cfg["limit_files"],
    )

    out_path = str(cfg["out_jsonl"])
    save_jsonl(records, out_path)

    print(f"Corpus chunk√© √©crit : {out_path}")
    print_quick_stats(records)
    return 0


def main(argv: Optional[List[str]] = None) -> int:
    """
    Point d'entr√©e du script.

    - Si on fournit des arguments (mode terminal), on parse la CLI.
    - Si aucun argument n'est fourni (Spyder / Run), on utilise la config par d√©faut.
    """
    # Cas typique Spyder : sys.argv contient uniquement le nom du script
    no_cli_args = (argv is None and len(sys.argv) <= 1)

    if no_cli_args:
        cfg = default_run_config()

        # S√©curit√© minimale : √©viter de lancer un run sur un chemin vide par erreur
        if not cfg["data_root"] or not cfg["out_jsonl"]:
            raise ValueError("Configurer 'data_root' et 'out_jsonl' dans default_run_config().")

        return run_with_config(cfg)

    # Mode CLI explicite (terminal)
    args = parse_args(argv)
    cfg = {
        "data_root": args.data_root,
        "out_jsonl": args.out_jsonl,
        "max_chars": args.max_chars,
        "overlap_paragraphs": args.overlap_paragraphs,
        "min_text_len": args.min_text_len,
        "keep_links": (not args.no_links),
        "links_sample_size": args.links_sample_size,
        "limit_files": args.limit_files,
    }
    return run_with_config(cfg)


## ‚ñ∂Ô∏è Ex√©cuter (exemple)
1) Ajuste `data_root` et `out_jsonl` selon notre machine.
2) Lance `run_with_config(cfg)`.


In [2]:
# Exemple notebook : ex√©cuter avec la configuration par d√©faut, puis ajuster les chemins.
cfg = default_run_config()

# TODO: Adapter ces 2 chemins pour notre environnement
# cfg["data_root"] = r"D:\...\data"
# cfg["out_jsonl"] = r"D:\...\corpus_chunks.jsonl"

# Optionnel : test rapide
# cfg["limit_files"] = 200

run_with_config(cfg)


Corpus chunk√© √©crit : D:\-- Projet RAG Avocats --\data_main\result_tests\corpus_chunks.jsonl
Documents sources : 4091
Chunks g√©n√©r√©s     : 13180
Taille chunk (chars) ‚Äî min/avg/max : 86 / 942.8 / 3002
Liens (m√©tadonn√©es) ‚Äî moyenne par chunk : 26.8


0

## Analyses

### R√©sultats du run (corpus complet)
- **Documents sources** : 4091  
- **Chunks g√©n√©r√©s** : 13180  
- **Taille des chunks (chars)** : min 86 / **moy 942.8** / max 3002  
- **Liens d√©tect√©s (m√©tadonn√©es)** : **~26.8 liens/chunk** en moyenne

### V√©rifications qualitatives (inspection guid√©e)
Nous avons inspect√© 5 cas ‚Äúinformatifs‚Äù :
1) **Chunk le plus long** : texte juridique exploitable (bon signal).
2) **Chunk avec le plus de liens** : apr√®s nettoyage, suppression de l‚Äô**en-t√™te technique** (ID/URL/labels concat√©n√©s) ; le chunk d√©marre sur du contenu exploitable.
3) **Document produisant le plus de chunks** : contenus longs (annexes/fiches) correctement segment√©s.
4) **Chunk contenant un mot-cl√© (‚Äúcontrat‚Äù)** : contenu normatif bien r√©cup√©r√©.
5) **Chunk le plus court** : normal (alin√©as courts / articles abrog√©s). √Ä filtrer √©ventuellement selon impact sur les m√©triques.

### Points ‚ÄúOK‚Äù pour le retrieval et le RAG
- Chunking **passage-level** : meilleur alignement question ‚Üî passage.
- Champs de tra√ßabilit√© (`doc_id`, `chunk_index`, `chunk_id`) : permet de r√©cup√©rer les chunks voisins (i-1, i, i+1) et de reconstituer le contexte lors d‚Äôune r√©ponse LLM.
- `doc_type` (article / section_ta / other) : permet de filtrer rapidement des cat√©gories potentiellement bruit√©es (ex: tables des mati√®res).

### Pistes d‚Äôam√©lioration rapides (optionnelles)
1) **Filtrage des chunks tr√®s courts** (ex: < 120‚Äì150 chars) si cela d√©grade BM25 (√† valider via benchmark).
2) **Hybride** : BM25 ‚Üí candidats (top N) puis rerank dense (top K) : souvent un gain net en juridique.
3) **Filtrage par date/version** (en vigueur √† la date X) via `date_debut`/`date_fin`.

### Point important ‚Äúmission‚Äù
Ce POC est r√©alis√© sur un corpus type L√©gifrance (FR) pour valider la **m√©thode** (extraction, chunking, benchmark).  
Pour un RAG juridique sur la **l√©gislation marocaine**, l‚Äôindustrialisation n√©cessitera une adaptation aux formats/sources marocains (structure XML/HTML/PDF, m√©tadonn√©es, typologies, langues) et au versionning juridique.

### Roadmap (industrialisation)
Une vraie mise en production d‚Äôun RAG juridique n√©cessite une **√©tude syst√©matique du corpus** :
- inventaire des balises/structures (XML), typologies de documents, qualit√© des m√©tadonn√©es,
- gestion des versions/dates (‚Äúen vigueur √† la date X‚Äù),
- strat√©gie d‚Äôextraction/chunking robuste et maintenable dans le temps.