In [296]:
from sentence_transformers import SentenceTransformer
import pdfplumber
import os
import faiss
import matplotlib.pyplot as plt
import random
import time
import pandas as pd
import numpy as np
import re


import networkx as nx

from IPython.display import display, Markdown
from sklearn.metrics.pairwise import cosine_similarity, linear_kernel


import google.genai as genai
from dotenv import load_dotenv



from tqdm.notebook import trange, tqdm
from collections import Counter


import requests
import xml.etree.ElementTree as ET
import re
from tqdm import tqdm

load_dotenv()


True

In [278]:
import os
import requests
from typing import List, TypedDict, Optional

from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from pypdf import PdfReader

import json

from dotenv import load_dotenv
# api key
load_dotenv()
os.environ["ANTHROPIC_API_KEY"] = os.getenv("CLAUDE_API_KEY") 


In [279]:
class AgentState(TypedDict):
    input: str
    is_legal: bool
    codes: List[str]
    results: dict

llm = ChatAnthropic(
    model="claude-sonnet-4-5-20250929",
    temperature=0
)


### Juridique?

In [None]:
legal_prompt = ChatPromptTemplate.from_messages([
    ("system",
     "Tu es juriste expert en droit français.\n"
     "est ce que le texte suivant relève du droit français ?\n"
     "Réponds STRICTEMENT par OUI ou NON."),
    ("human", "{text}")
])

legal_chain = legal_prompt | llm | StrOutputParser()

def is_french_law(state: AgentState):
    resp = legal_chain.invoke({"text": state["input"]}).strip().upper()
    return {**state, "is_legal": resp == "OUI"}


### Code?

In [None]:
code_prompt = ChatPromptTemplate.from_messages([
    ("system",
     "Retourne STRICTEMENT du JSON valide.\n"
     "tu dois extraire les noms des codes juridiques français qui pourrais etre utile dans le texte donné.\n"
     "Exemples :\n"
     '["CodeCivil"]\n'
     '["CodedelaRoute"]'
     '["CodeCivil", "CodePenal"]\n'
     '[]\n'
     "AUCUN texte autour."),
    ("human", "{text}")
])



code_chain = code_prompt | llm | StrOutputParser()



def detect_codes(state: AgentState):
    raw = code_chain.invoke({"text": state["input"]})
    try:
        json_text = raw[raw.find("["):raw.rfind("]")+1]
        codes = json.loads(json_text)
    except json.JSONDecodeError:
        codes = []
    return {**state, "codes": codes, "results": {}}


## Recherche locale

In [282]:
def check_local_files(state: AgentState):
    results = {}

    for code in state["codes"]:
        base = f"./code/{code}"

        if os.path.exists(base + ".xml"):
            results[code] = {
                    "type": "xml",
                    "source": "local",
                    "url": base + ".xml"
                }

        elif os.path.exists(base + ".pdf"):
            results[code] = {
                    "type": "pdf",
                    "source": "local",
                    "url": base + ".pdf"
                }
        else:
            results[code] = None

    return {**state, "results": results}


## Recherche online

In [None]:
import requests
from bs4 import BeautifulSoup

from rapidfuzz import process, fuzz

# Fonction pour normaliser la recherche


def get_all_codes():
    url = "https://codes.droit.org/"
    response = requests.get(url)
    html_content = response.text

    # Parsing du HTML avec BeautifulSoup
    soup = BeautifulSoup(html_content, "html.parser")

    # Trouver tous les liens <a> dont href se termine par .xml
    xml_links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith('.xml')]

    return [link.replace("payloads/", "").replace(".xml", "") for link in xml_links]


from rapidfuzz import process, fuzz

def find_best_code(user_input, threshold=70):
    codes = get_all_codes()

    if not codes:
        return None

    meilleur_match = process.extractOne(user_input, codes, scorer=fuzz.ratio)

    return "https://codes.droit.org/payloads/" + meilleur_match[0] +".xml"

def fetch_online_xml(state: AgentState):
    for code, result in state["results"].items():
        if result is None:
            url = find_best_code(code)
            r = requests.get(url)
            if r.status_code == 200:
                state["results"][code] = {
                    "type": "xml",
                    "source": "online",
                    "url": url
                }
    return state


## CONSTRUCTION DU GRAPHE 

In [None]:
def route_legal(state: AgentState):
    return "detect" if state["is_legal"] else END

graph = StateGraph(AgentState)

graph.add_node("is_legal", is_french_law)
graph.add_node("detect", detect_codes)
graph.add_node("local", check_local_files)
graph.add_node("online", fetch_online_xml)

graph.set_entry_point("is_legal")

graph.add_conditional_edges(
    "is_legal",
    route_legal,
    {"detect": "detect", END: END}
)

graph.add_edge("detect", "local")
graph.add_edge("local", "online")
graph.add_edge("online", END)

app = graph.compile()


In [288]:

query = "j'ai des questions sur le code de la route et le code civil"
result = app.invoke({
    "input": query
})

from pprint import pprint
pprint(result)


Raw codes detected: ```json
["CodedelaRoute", "CodeCivil"]
```
Parsed codes: ['CodedelaRoute', 'CodeCivil']
{'codes': ['CodedelaRoute', 'CodeCivil'],
 'input': "j'ai des questions sur le code de la route et le code civil",
 'is_legal': True,
 'results': {'CodeCivil': {'source': 'local',
                           'type': 'pdf',
                           'url': './code/CodeCivil.pdf'},
             'CodedelaRoute': {'source': 'online',
                               'type': 'xml',
                               'url': 'https://codes.droit.org/payloads/Code%20de%20la%20route.xml'}}}


In [None]:
for i in result["results"]:
    print(result["results"][i]["source"])

CodedelaRoute {'type': 'xml', 'source': 'online', 'url': 'https://codes.droit.org/payloads/Code%20de%20la%20route.xml'}
online
CodeCivil {'type': 'pdf', 'source': 'local', 'url': './code/CodeCivil.pdf'}
local


In [None]:
def extract_text_from_pdfs(file_path):
    filename = os.path.basename(file_path)
    path = os.path.join(file_path)
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            text += page.extract_text() + "\n"
    return (filename, text)

def extract_articles(code):
    text= code[1]
    parts = re.split(r'(Article\s\d+[^\n]*)', text)
    articles = []
    for i in range(1, len(parts), 2):
        title = parts[i].strip()
        content = parts[i + 1].strip()
        articles.append({"code":code[0],"title": title, "content": content,"structure" : None})
    return articles


papers = extract_text_from_pdfs("./code/CodeCivil.pdf")

articles = extract_articles(papers)
# for article in articles:
#     all_articles.append({"filename": papers[0], "title": article["title"], "content": article["content"]})

print(articles[:5])

[{'title': 'Article 1', 'content': "Les lois et, lorsqu'ils sont publiés au Journal officiel de la République française, les actes administratifs\nentrent en vigueur à la date qu'ils fixent ou, à défaut, le lendemain de leur publication. Toutefois, l'entrée en\nvigueur de celles de leurs dispositions dont l'exécution nécessite des mesures d'application est reportée à la\ndate d'entrée en vigueur de ces mesures.\nEn cas d'urgence, entrent en vigueur dès leur publication les lois dont le décret de promulgation le prescrit et\nles actes administratifs pour lesquels le Gouvernement l'ordonne par une disposition spéciale.\nLes dispositions du présent article ne sont pas applicables aux actes individuels."}, {'title': 'Article 2', 'content': "La loi ne dispose que pour l'avenir ; elle n'a point d'effet rétroactif."}, {'title': 'Article 3', 'content': "Les lois de police et de sûreté obligent tous ceux qui habitent le territoire.\nLes immeubles, même ceux possédés par des étrangers, sont régi

# Web

In [None]:
def load_code_xml(url):
    response = requests.get(url)
    response.raise_for_status()
    return ET.fromstring(response.content)

def extract_articles_from_xml(root):
    articles = []
    # take the first line it's the title

    # Structure hiérarchique actuelle
    current_structure = {
        0: None,  # Partie législative
        1: None,  # Livre
        2: None,  # Titre
        3: None   # Chapitre
    }

    for elem in root.iter():
        tag = elem.tag.lower()
        if tag== "code":
            code= elem.attrib.get("nom")
        # Gestion des balises <t> avec niveau
        if tag == "t":
            niveau = int(elem.attrib.get("niveau", -1))
            title = elem.attrib.get("title")
            if niveau >= 0:
                current_structure[niveau] = title
                # Réinitialiser les niveaux inférieurs
                for lvl in range(niveau + 1, 4):
                    current_structure[lvl] = None

        # Gestion des articles
        elif tag == "article":
            num = elem.attrib.get("num")
            article_id = elem.attrib.get("id") or elem.attrib.get("num")
            # Récupère le texte complet, en incluant <br/> comme retour à la ligne
            texte = "".join(elem.itertext()).replace("\n", " ").replace("<br/>", "\n").strip()

            if not article_id or not texte:
                continue

            articles.append({
                "code": code,
                "title": f"Article {num}",
                "content": texte,
                "structure": [
                    current_structure[0],
                    current_structure[1],
                    current_structure[2],
                    current_structure[3]
                ]
            })

    return articles


root=load_code_xml("https://codes.droit.org/payloads/Code%20p%C3%A9nal.xml")
new_articles = extract_articles_from_xml(root)

Extracting articles from code: Code pénal


In [None]:
embedder = SentenceTransformer('dangvantuan/french-document-embedding',trust_remote_code=True)  

def embed_articles(all_articles):
    texts = [a['content'] for a in all_articles]
    embeddings = embedder.encode(texts, show_progress_bar=True)
    return embeddings


def build_faiss_index(embeddings):
    dim = embeddings[0].shape[0]
    index = faiss.IndexFlatL2(dim)
    index.add(np.array(embeddings))
    return index


def build_graph(all_articles):
    G = nx.DiGraph()
    for article in all_articles:
        G.add_node(article['code']+" "+article['title'], content=article['content'])
    return G

In [None]:
def enrich_with_cross_references(G, all_articles):
    # Préparer et compiler les patterns
    patterns = {
        a["title"]: re.compile(
            r"(?<![\w\d])" + re.escape(a["title"]) + r"(?![\w\d])", 
            re.IGNORECASE
        )
        for a in all_articles
    }

    articles_by_code = {}
    for a in all_articles:
        articles_by_code.setdefault(a["code"], []).append(a)

    # Parcours par code
    for code, articles in articles_by_code.items():
        for a in tqdm(articles, desc=f"Cross references for code {code}"):
            source = a["title"]
            content = a["content"]

            for target_article in articles:
                target = target_article["title"]
                if target == source:
                    continue
                if patterns[target].search(content):
                    G.add_edge(source, target, relation="cite")

def add_structure_nodes(G, all_articles):
    for art in all_articles:
        if art['structure'] is not None:
            structure = re.findall(r"Livre [IVX]+|Titre [\w\s]+|Chapitre [\w\d]+", art['content'])
            for node in structure:
                if not G.has_node(node):
                    G.add_node(node, type="structure")
                G.add_edge(node, art['title'], relation="contient")
        else:
            if not G.has_node(art["title"]):
                G.add_node(
                    art["title"],
                    type="article",
                    text=art["content"]
                )

            # structure nodes
            for s in art["structure"]:
                if not s:
                    continue

                if not G.has_node(s):
                    G.add_node(s, type="structure")

                G.add_edge(s, art["title"], relation="contient")


def enrich_with_semantic_similarity(G, all_articles, embeddings, threshold=0.82):
    sim_matrix = cosine_similarity(embeddings)
    for i in range(len(all_articles)):
        for j in range(i+1, len(all_articles)):
            if sim_matrix[i][j] >= threshold:
                G.add_edge(all_articles[i]["title"], all_articles[j]["title"], relation="similaire", weight=sim_matrix[i][j])

def add_articles_to_graph(G, all_articles):
    for art in all_articles:
        # article node
        if not G.has_node(art["title"]):
            G.add_node(
                art["title"],
                type="article",
                text=art["content"]
            )

        # structure nodes
        for s in art["structure"]:
            if not s:
                continue

            if not G.has_node(s):
                G.add_node(s, type="structure")

            G.add_edge(s, art["title"], relation="contient")

def enrich_with_cross_references(G, all_articles):
    article_titles = [a["title"] for a in all_articles]

    patterns = {
        title: re.compile(r"\b" + re.escape(title) + r"\b", re.IGNORECASE)
        for title in article_titles
    }

    for a in tqdm(all_articles, desc="Cross references"):
        source = a["title"]
        content = a["content"]

        for target, pattern in patterns.items():
            if target == source:
                continue
            if pattern.search(content):
                G.add_edge(source, target, relation="cite")