# Objectif du notebook :
intéragir avec Albert pour effectuer différentes opérations basiques : 
- ajouter/supprimer/lire collection
- ajouter/supprimer/lire documents indexés
- poser des questions à Albert
- effectuer une recherche via /search

In [None]:
import os
import requests
from openai import OpenAI
import time
import glob
from pathlib import Path

In [None]:
# OpenAI client configuration
base_url = "https://albert.api.etalab.gouv.fr/v1"
api_key = os.getenv("ALBERT_API_KEY")
client = OpenAI(base_url=base_url, api_key=api_key)
session = requests.session()
session.headers = {"Authorization": f"Bearer {api_key}"}

In [None]:
# Download a file
file_path = "/home/pleroy/Downloads/anssi-guide-authentification_multifacteur_et_mots_de_passe.pdf"

In [None]:
collection_id = "ANSSI_test"

In [None]:
language_model, embeddings_model = None, None

for model in client.models.list().data:
    if model.type == "text-generation" and language_model is None:
        language_model = model.id
    if model.type == "text-embeddings-inference" and embeddings_model is None:
        embeddings_model = model.id

print(f"language model: {language_model}\nembeddings model: {embeddings_model}")

# Get Collections

In [None]:
def get_collections():
    response = session.get(f"{base_url}/collections")
    response = response.json()
    return response


response = get_collections()
response

In [None]:
session.get(f"{base_url}/collections").json()

# Delete colelction

In [None]:
collections_id = [2137]

In [None]:
for collection_id in collections_id:
    print(collection_id)
    response = session.delete(f"{base_url}/collections/{collection_id}")
    print(response)

# Create collections

In [None]:
collection = "ANSSI_test"

response = session.post(
    f"{base_url}/collections", json={"name": collection, "model": embeddings_model}
)
response = response.json()
collection_id = response["id"]
print(f"Collection ID: {collection_id}")

# Get Collection

In [None]:
def get_collection(collection_id=collection_id):
    response = session.get(f"{base_url}/collections/{collection_id}")
    response = response.json()
    return response


get_collection(collection_id)

# Lister documents indexés

In [None]:
def get_documents(session, base_url, collection_id, limit=10, offset=0):
    params = {
        "collection": collection_id,
        "limit": limit,
        "offset": offset,
    }
    response = session.get(f"{base_url}/documents", params=params)
    response.raise_for_status()
    return response.json()


def get_distinct_names(session, base_url, collection_id, limit=100, offset=0):
    docs = get_documents(session, base_url, collection_id, limit, offset)
    # Utilisation d’un set pour dédupliquer
    names = {doc["name"] for doc in docs.get("data", []) if "name" in doc}
    return sorted(names)


# Exemple d’appel
distinct_names = get_distinct_names(session, base_url, collection_id, limit=100)
distinct_names

### Lister les documents

In [None]:
def liste_dcouments_dans_collection(collection_id):
    params = {
        "collection": collection_id,
        "limit": 100,
        "offset": 0,
    }
    response = session.get(f"{base_url}/documents", params=params)
    return response.json()


liste_dcouments_dans_collection(collection_id)

In [None]:
collection_id

### Lister les Chunks d'un document

In [None]:
document_id = 963413
response = session.get(f"{base_url}/chunks/{document_id}")
response.json()

# Ajouter pdf => /files

In [None]:
def add_file_deprecated(file_path, distinct_names):
    files = {
        "file": (os.path.basename(file_path), open(file_path, "rb"), "application/pdf")
    }
    data = {"request": '{"collection": "%s"}' % collection_id}

    if Path(file_path).name not in distinct_names:
        print(f"File to add : {file_path}")
        response = session.post(f"{base_url}/files", data=data, files=files)
        assert response.status_code == 201

In [None]:
files = glob.glob("/home/pleroy/Downloads/ANSSI/*.pdf")
for file_path in files:
    add_file_deprecated(file_path, distinct_names)

# Ajouter pdf => /documents

In [None]:
get_collection()

In [None]:
from typing import Dict, Optional
import json


def ajouter_fichier(
    chemin_fichier: str,
    id_collection: str,
    base_url: str,
    url_source: Optional[str] = None,
) -> requests.Response:
    """
    Téléverse un PDF dans Albert et ajoute l'URL d'origine en métadonnée.

    :param chemin_fichier: chemin local du PDF.
    :param id_collection: identifiant de la collection cible.
    :param url_source: URL publique du document.
    :return: réponse HTTP de l’API.
    """
    nom = Path(chemin_fichier).name
    with open(chemin_fichier, "rb") as flux:
        fichiers = {"file": (nom, flux, "application/pdf")}
        donnees = {
            "collection": str(id_collection),
            "metadata": json.dumps({"source_url": url_source}),
        }
        # autres options possibles :
        # "paginate_output": "false",
        # "force_ocr": "false",
        # "output_format": "markdown",
        # "chunk_size": "2048",
        reponse = session.post(f"{base_url}/documents", data=donnees, files=fichiers)
    return reponse


# Dictionnaire nom -> URL (faites correspondre à vos fichiers locaux)
URLS_PAR_NOM: Dict[str, str] = {
    "anssi-guide-authentification_multifacteur_et_mots_de_passe.pdf": "https://cyber.gouv.fr/sites/default/files/2021/10/anssi-guide-authentification_multifacteur_et_mots_de_passe.pdf",
    "anssi-guide-gestion_crise_cyber.pdf": "https://cyber.gouv.fr/sites/default/files/2021/12/anssi-guide-gestion_crise_cyber.pdf",
    "guide_hygiene_informatique_anssi.pdf": "https://cyber.gouv.fr/sites/default/files/2017/01/guide_hygiene_informatique_anssi.pdf",
    "guide_nomadisme_anssi_pa_054_v2.pdf": "https://cyber.gouv.fr/sites/default/files/document/guide_nomadisme_anssi_pa_054_v2.pdf",
    "anssi-guide-admin_securisee_si_v3-0.pdf": "https://cyber.gouv.fr/sites/default/files/2018/04/anssi-guide-admin_securisee_si_v3-0.pdf",
    "anssi-guide-passerelle_internet_securisee-v3.pdf": "https://cyber.gouv.fr/sites/default/files/2020/06/anssi-guide-passerelle_internet_securisee-v3.pdf",
    "anssi-fondamentaux-sauvegarde_systemes_dinformation_v1-0.pdf": "https://cyber.gouv.fr/sites/default/files/document/anssi-fondamentaux-sauvegarde_systemes_dinformation_v1-0.pdf",
    "guide_protection_des_systemes_essentiels.pdf": "https://cyber.gouv.fr/sites/default/files/2020/12/guide_protection_des_systemes_essentiels.pdf",
    "guide-homologation-securite-web-04-2025.pdf": "https://cyber.gouv.fr/sites/default/files/document/guide-homologation-securite-web-04-2025.pdf",
    "anssi-guide-recommandations_mise_en_oeuvre_site_web_maitriser_standards_securite_cote_navigateur-v2.0.pdf": "https://cyber.gouv.fr/sites/default/files/2013/05/anssi-guide-recommandations_mise_en_oeuvre_site_web_maitriser_standards_securite_cote_navigateur-v2.0.pdf",
    "secnumcloud-referentiel-exigences-v3.2.pdf": "https://cyber.gouv.fr/sites/default/files/document/secnumcloud-referentiel-exigences-v3.2.pdf",
    "LAB_Homologation_Simplifiee.pdf": "https://monservicesecurise-ressources.cellar-c2.services.clever-cloud.com/LAB_Homologation_Simplifiee.pdf",
}


def url_pour(chemin_fichier: str) -> Optional[str]:
    """Retourne l’URL mappée à partir du nom de fichier, sinon None."""
    nom = Path(chemin_fichier).name
    return URLS_PAR_NOM.get(nom)

In [None]:
fichiers = glob.glob("/home/pleroy/Downloads/ANSSI/*.pdf")

In [None]:
fichiers = [fichiers[-1]]

In [None]:
def est_deja_indexe(collection_id: str, nom_fichier: str) -> bool:
    """
    Vérifie si un document avec ce nom existe déjà dans la collection.
    """
    documents = liste_dcouments_dans_collection(collection_id)["data"]
    noms = {doc["name"] for doc in documents}
    return nom_fichier in noms


nombre_documents_indexes = get_collection(collection_id)["documents"]

for chemin in fichiers:
    nom_fichier = Path(chemin).name
    # Vérification préalable
    if est_deja_indexe(collection_id, nom_fichier):
        print(f"[SKIP] Déjà indexé : {nom_fichier}")
        continue

    url = url_pour(chemin)
    succes = False
    tentative = 0

    while tentative < 3 and not succes:
        tentative += 1
        print(f"Tentative {tentative} pour {nom_fichier} ...")

        r = ajouter_fichier(
            chemin,
            collection_id,
            base_url=base_url,
            url_source=url,
        )

        nombre_documents_indexes_actuellement = get_collection(collection_id)[
            "documents"
        ]

        if nombre_documents_indexes_actuellement - nombre_documents_indexes == 1:
            print(f"Le document a été indexé : {chemin}")
            succes = True
            nombre_documents_indexes = nombre_documents_indexes_actuellement
        else:
            print(f"Le document n'a pas été indexé : {chemin}")
            if tentative < 3:
                print("Nouvel essai dans 5 secondes...")
                time.sleep(5)
            else:
                print("Échec après 3 tentatives.")

        print(r.status_code)
        print("*" * 10)
        try:
            print(r.json())
        except Exception:
            print(f"ERROR : {r.text}")

In [None]:
get_collection(collection_id)["documents"]

# Supprimer document

In [None]:
session.delete(f"{base_url}/documents/971024")

In [None]:
get_collection()

# Poser questions

### pour cette question: deux docs semblent dire la même chose :
- Quelles mesures doivent être prises pour garantir la pérennité et la sécurité d’une cartographie ?


In [None]:
import re
import textwrap
from tabulate import tabulate


def ask_question(
    question: str,
    collection_id=None,
    base_url=None,
    session=None,
    client=None,
    k: int = 6,
    show_table: str = "itables",  # "itables" | "rich" | "tabulate" | None
):
    """
    Pose une question (RAG + LLM) et affiche un tableau des chunks.
    - show_table="itables": DataFrame interactif avec itables (aligné à gauche, chunk complet)
      "rich": tableau terminal riche
      "tabulate": tableau terminal compact
      None: n'affiche pas le tableau
    """

    # --- utilitaire pour versions "rich/tabulate" uniquement (aperçu court) ---
    def _short(s: str, n: int) -> str:
        s = re.sub(r"\s+", " ", (s or "").replace("\n", " ").strip())
        return textwrap.shorten(s, width=n, placeholder="…")

    # --- 1) Search ---
    payload = {
        "collections": [collection_id],
        "k": k,
        "prompt": question,
        "method": "semantic",
    }
    resp = session.post(f"{base_url}/search", json=payload)
    resp.raise_for_status()
    search_data = resp.json().get("data", [])
    if not search_data:
        raise RuntimeError(
            "Aucun chunk retourné par /search — vérifie la collection/documents."
        )

    # --- 2) Préparer les chunks + méta ---
    chunks_for_prompt = []
    sources_set = set()

    # Deux jeux de lignes :
    # - rows_short : pour l'affichage terminal (aperçu tronqué lisible)
    # - rows_full  : pour itables/pandas (chunk complet)
    rows_short = []
    rows_full = []

    for idx, res in enumerate(search_data, start=1):
        chunk = res.get("chunk") or {}
        meta = chunk.get("metadata") or {}
        content = chunk.get("content", "")

        doc_name = (
            meta.get("document_name") or meta.get("source") or meta.get("file") or "—"
        )
        page = meta.get("page") or meta.get("page_number")
        score = res.get("score")
        source_url = meta.get("source_url") or meta.get("url") or meta.get("path")

        chunks_for_prompt.append(f"[{idx}] {content}")

        rows_short.append(
            {
                "#": idx,
                "document": _short(doc_name, 44),
                "page": page if page is not None else "",
                "score": f"{score:.3f}" if isinstance(score, (int, float)) else "",
                "source": _short(source_url or "", 44),
                "aperçu": _short(content, 120),
            }
        )

        rows_full.append(
            {
                "#": idx,
                "document": doc_name,
                "page": page if page is not None else "",
                "score": score,
                "source": source_url or "",
                "chunk complet": content,  # 🔥 texte intégral
            }
        )

        if doc_name and doc_name != "—":
            sources_set.add(doc_name)

    # --- 3) Prompt template ---
    prompt_template = """
Vous êtes un assistant spécialisé dans la cybersécurité et la conformité, qui doit répondre uniquement à partir des documents officiels fournis (issus de l’ANSSI).

🎯 Objectif : Fournir une réponse claire, précise, synthétique et factuelle à la question de l’utilisateur, en vous basant exclusivement sur le contenu des documents donnés dans la section "Documents".

⚠️ Contraintes :
- N’inventez aucune information qui n’apparaît pas dans les documents fournis.
- Si la réponse ne figure pas dans les documents, répondez simplement : "Les documents fournis ne permettent pas de répondre directement à cette question."
- Soyez concis, mais complet : privilégiez la reformulation claire plutôt que la citation brute, sauf si une phrase exacte est nécessaire.
- Intégrez systématiquement les références aux documents (par leur nom ou source) pour justifier vos affirmations. Utilisez les indices [n] des morceaux comme renvois si utile.
- Utilisez un style factuel, neutre et professionnel.

Format attendu :
- ✅ Une réponse directe et synthétique à la question.

Question :
{user_question}

Documents (morceaux) :
{chunks_block}
""".strip()

    chunks_block = "\n\n".join(chunks_for_prompt)
    final_prompt = prompt_template.format(
        user_question=question, chunks_block=chunks_block
    )

    # --- 4) Appel modèle ---
    completion = client.chat.completions.create(
        messages=[{"role": "user", "content": final_prompt}],
        model="albert-large",
        stream=False,
        n=1,
    )
    answer = completion.choices[0].message.content

    # --- 5) Affichages lisibles ---
    print("\n===== RÉPONSE =====\n")
    print(textwrap.fill(answer, width=100))

    print("\n===== SOURCES (documents distincts) =====")
    if sources_set:
        for s in sorted(sources_set):
            print("-", s)
    else:
        print("Aucune source disponible dans les métadonnées.")

    # --- 6) Tableau des chunks ---
    if show_table == "itables":
        try:
            from itables import show, options
            import pandas as pd

            # alignement à gauche partout
            options.columnDefs = [{"targets": "_all", "className": "dt-left"}]
            df = pd.DataFrame(rows_full)
            print("\n===== CHUNKS (itables) =====")
            show(df, maxBytes=0)  # pas de troncature
        except Exception as e:
            print(f"[itables indisponible] Fallback tabulate. Détail: {e}")
            show_table = "tabulate"  # bascule vers tabulate

    if show_table == "rich":
        try:
            from rich.console import Console
            from rich.table import Table

            console = Console()
            table = Table(show_header=True, header_style="bold")
            table.add_column("#", justify="right", width=3, no_wrap=True)
            table.add_column("document", overflow="ellipsis", max_width=44)
            table.add_column("page", justify="right", width=4, no_wrap=True)
            table.add_column("score", justify="right", width=6, no_wrap=True)
            table.add_column("source", overflow="ellipsis", max_width=44)
            table.add_column("aperçu", overflow="fold", max_width=120)
            for r in rows_short:
                table.add_row(
                    str(r["#"]),
                    r["document"],
                    str(r["page"]),
                    r["score"],
                    r["source"],
                    r["aperçu"],
                )
            print("\n===== CHUNKS (rich) =====")
            console.print(table)
        except Exception as e:
            print(f"[rich indisponible] Fallback tabulate. Détail: {e}")
            show_table = "tabulate"

    if show_table == "tabulate":
        headers = ["#", "document", "page", "score", "source", "aperçu"]
        print("\n===== CHUNKS (tabulate) =====")
        print(
            tabulate(
                [[r[h] for h in headers] for r in rows_short],
                headers=headers,
                tablefmt="github",
                stralign="left",
                maxcolwidths=[3, 44, 4, 6, 44, 120],
            )
        )

    return answer

# questions avec réponse satisfaisante :
- Les mises à jour logicielles sont-elles vraiment indispensables ?
- Quels pare-feux dois-je déployer pour sécuriser l’interconnexion entre mon SI et Internet ?"
- Quels pare-feux dois-je déployer pour sécuriser l’interconnexion entre mon SI et Internet ?"
  Est-ce que je peux utiliser mon ordinateur personnel pour administrer le SI de mon organisation ?

# Questions sans réponses :
- Qui est responsable en cas de crise cyber ?

reponse = ask_question("Les mises à jour logicielles sont-elles vraiment indispensables ?",
                       collection_id=collection_id, 
                       base_url=base_url, 
                       session=session, 
                       client=client)


# Debug search
objectif : voir les chunks

In [None]:
from collections import Counter


def _search_semantic(collection_id, q, k=40):
    payload = {
        "collections": [collection_id],
        "k": k,  # garde une valeur raisonnable (20–50)
        "method": "semantic",
        "prompt": q,
    }
    r = session.post(f"{base_url}/search", json=payload)
    r.raise_for_status()
    return r.json().get("data", [])


def distinct_document_names_semantic(collection_id, k=40, probes=None):
    # Probes : termes variés pour “accrocher” un max de chunks
    if probes is None:
        probes = [
            "cyber",
            "sécurité",
            "incident",
            "risque",
            "sauvegarde",
            "réponse",
            "attaque",
            "ANSSI",
            "gouvernance",
            "politique",
        ]
    names = set()
    for q in probes:
        for res in _search_semantic(collection_id, q, k=k):
            meta = (res.get("chunk", {}) or {}).get("metadata", {}) or {}
            name = meta.get("document_name") or meta.get("source") or meta.get("file")
            if name:
                names.add(name)
    return sorted(names)


def document_name_counts_semantic(collection_id, k=40, probes=None):
    if probes is None:
        probes = [
            "cyber",
            "sécurité",
            "incident",
            "risque",
            "sauvegarde",
            "réponse",
            "attaque",
            "ANSSI",
            "gouvernance",
            "politique",
        ]
    counter = Counter()
    for q in probes:
        for res in _search_semantic(collection_id, q, k=k):
            meta = (res.get("chunk", {}) or {}).get("metadata", {}) or {}
            name = meta.get("document_name") or meta.get("source") or meta.get("file")
            if name:
                counter[name] += 1
    return counter


# --- Utilisation ---
docs = distinct_document_names_semantic(collection_id, k=40)
print("Documents distincts :", len(docs))
for d in docs:
    print("-", d)

print("\nApprox. répartition des chunks vus :")
for name, cnt in document_name_counts_semantic(collection_id, k=40).most_common():
    print(f"{cnt:3d}  {name}")

# Recuperer les metaddata

In [None]:
from typing import Dict, Any, List, Optional
import requests


def liste_documents_avec_meta(
    collection_id: int, base_url: str, session: requests.Session
) -> List[Dict[str, Any]]:
    # 1) tentative "officielle"
    try:
        r = session.get(f"{base_url}/documents", params={"collection": collection_id})
        r.raise_for_status()
        data = r.json().get("data", [])
        enriched: List[Dict[str, Any]] = []
        for d in data:
            if "metadata" in d and isinstance(d["metadata"], dict):
                enriched.append(d)
            else:
                # complète par /documents/{id}
                rd = session.get(f"{base_url}/documents/{d['id']}")
                rd.raise_for_status()
                jd = rd.json()
                d["metadata"] = jd.get("metadata", {})
                enriched.append(d)
        return enriched
    except requests.HTTPError as e:
        # 404 => fallback search
        if e.response is None or e.response.status_code != 404:
            raise  # autre erreur -> remonter
    return enriched


liste_documents_avec_meta(collection_id, base_url, session)

In [None]:
collection_id