In [1]:
# ============================================================
# CELL 1) Install
# ============================================================
!pip -q install faiss-cpu sentence-transformers pypdf transformers accelerate bitsandbytes tqdm gradio


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m51.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m329.0/329.0 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.1/59.1 MB[0m [31m16.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# ============================================================
# CELL 2) Upload PDFs into /content/pdfs
# ============================================================
from google.colab import files
import os, shutil, glob

PDF_DIR = "/content/pdfs"
os.makedirs(PDF_DIR, exist_ok=True)

uploaded = files.upload()
for name in uploaded.keys():
    shutil.move(name, os.path.join(PDF_DIR, name))

pdf_files = sorted(glob.glob(os.path.join(PDF_DIR, "*.pdf")))
print("PDFs:", len(pdf_files))
print("Example:", os.path.basename(pdf_files[0]) if pdf_files else "None")


Saving alg1008.pdf to alg1008.pdf
Saving alg2005.pdf to alg2005.pdf
Saving alg2012.pdf to alg2012.pdf
Saving alg2038.pdf to alg2038.pdf
Saving alg2040.pdf to alg2040.pdf
Saving alg3579.pdf to alg3579.pdf
Saving alg3603.pdf to alg3603.pdf
Saving alg4047.pdf to alg4047.pdf
Saving alg4100.pdf to alg4100.pdf
Saving alg4101.pdf to alg4101.pdf
PDFs: 10
Example: alg1008.pdf


In [None]:
# ============================================================
# CELL 3) PDF text extraction + cleaning + chunking
# ============================================================
import re
import os
from pypdf import PdfReader

def _clean_text(s: str) -> str:
    s = re.sub(r"\s+", " ", (s or "").strip())
    # remove leading bullets produced by model
    s = re.sub(r"^\s*[-•]\s*", "", s)
    # Aggressively filter out non-French/non-standard characters
    # Keep alphanumeric, common punctuation, and French accented characters
    s = re.sub(r'[^a-zA-Z0-9.,;:\'"?!\-àéèçùôîüœ\s]', '', s)
    return s.strip()

def extract_pages_from_pdf(pdf_path: str):
    r = PdfReader(pdf_path)
    for i, page in enumerate(r.pages, start=1):
        txt = (page.extract_text() or "")
        txt = " ".join(txt.split())
        txt = _clean_text(txt)
        if txt.strip():
            yield {"source": os.path.basename(pdf_path), "page": i, "text": txt}

def chunk_text(text: str, chunk_size=1200, overlap=200):
    if overlap >= chunk_size:
        raise ValueError("overlap must be < chunk_size")
    chunks = []
    start = 0
    n = len(text)
    while start < n:
        end = min(n, start + chunk_size)
        ch = text[start:end].strip()
        if ch:
            chunks.append(ch)
        next_start = end - overlap
        if next_start <= start:
            break
        start = next_start
    return chunks

print("PDF preprocessing functions ready.")


PDF preprocessing functions ready.


In [None]:
# ============================================================
# CELL 4) Build FAISS index (demo محدود به 10 فایل)
# ============================================================
import glob
import numpy as np
import faiss
from tqdm import tqdm
from sentence_transformers import SentenceTransformer

EMB_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
embedder = SentenceTransformer(EMB_MODEL)

# تنظیمات برای جلوگیری از کرش
MAX_PDFS_FOR_DEMO = 10          # اول 10 تا
EMBED_BATCH = 64                # اگر RAM کم بود 32 کن
CHUNK_SIZE = 1200
OVERLAP = 200

metas = []   # metadata
index = None

pdf_files = sorted(glob.glob("/content/pdfs/*.pdf"))[:MAX_PDFS_FOR_DEMO]
print("Using PDFs:", len(pdf_files))

def add_texts_to_faiss(texts, meta_batch):
    global index, metas
    vecs = embedder.encode(
        texts,
        normalize_embeddings=True,
        batch_size=EMBED_BATCH,
        show_progress_bar=False
    ).astype("float32")

    if index is None:
        dim = vecs.shape[1]
        index = faiss.IndexFlatIP(dim)

    index.add(vecs)
    metas.extend(meta_batch)

tmp_texts = []
tmp_meta  = []

for pdf in tqdm(pdf_files):
    for pg in extract_pages_from_pdf(pdf):
        for ch in chunk_text(pg["text"], chunk_size=CHUNK_SIZE, overlap=OVERLAP):
            tmp_texts.append(ch)
            tmp_meta.append({"source": pg["source"], "page": pg["page"], "text": ch})  # اگر RAM کم شد text را حذف کن

            if len(tmp_texts) >= EMBED_BATCH:
                add_texts_to_faiss(tmp_texts, tmp_meta)
                tmp_texts, tmp_meta = [], []

if tmp_texts:
    add_texts_to_faiss(tmp_texts, tmp_meta)

print("FAISS size:", index.ntotal if index else 0)

print("PDF files used for FAISS index:")
for pdf_file in pdf_files:
    print(f"- {os.path.basename(pdf_file)}")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/645 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/471M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Using PDFs: 10


100%|██████████| 10/10 [00:50<00:00,  5.01s/it]

FAISS size: 385
PDF files used for FAISS index:
- alg1008.pdf
- alg2005.pdf
- alg2012.pdf
- alg2038.pdf
- alg2040.pdf
- alg3579.pdf
- alg3603.pdf
- alg4047.pdf
- alg4100.pdf
- alg4101.pdf





In [None]:
# ============================================================
# CELL 5) Quick inspection of extracted text (first PDF, first 5 pages)
# ============================================================
print("Examining raw text from the first PDF in the list:")
chosen_pdf = pdf_files[0]
print(f"PDF file chosen: {os.path.basename(chosen_pdf)}\n")

page_count = 0
for page_data in extract_pages_from_pdf(chosen_pdf):
    if page_count >= 5:
        break
    print(f"--- Page {page_data['page']} from {page_data['source']} ---")
    print(page_data['text'])
    print("\n")
    page_count += 1


Examining raw text from the first PDF in the list:
PDF file chosen: alg1008.pdf

--- Page 1 from alg1008.pdf ---
Ordonnance n 74-86 du 17 septembre 1974 portant création de linstitut supérieur maritime. AU NOM DU PEUPLE. Le chef du gouvernement, Président du Conseil des ministres, Sur le rapport du ministre dtat chargé des transports, Vu les ordonnances n 65-182 du 10 juillet 1965 et 70-53 du 18 djoumada I 1390 correspondant au 21 juillet 1970 portant constitution du Gouvernement ; Vu lordonnance n65-320 du 31 décembre 1965 portant loi de finances pour 1966 et notamment ses articles 5 bis et 5 ter ; Vu lordonnance 67-290 du 30 décembre 1967 portant loi de finances pour 1968 et notamment son article 9 bis ; Vu lordonnance 69-107 du 31 décembre 1969 portant loi de finances pour 1970 ; Vu lordonnance 71-73 du 3 décembre 1971 fixant les conditions dattribution de bourses, de présalaires et de traitements de stage et les textes subséquents ; Vu le décret 65-259 du 14 octobre 1965 fixant les

In [None]:
# ============================================================
# CELL 6) Retriever
# ============================================================
def retrieve(query: str, top_k: int = 6, min_score: float = 0.30):
    xq = embedder.encode([query], normalize_embeddings=True).astype("float32")
    scores, idxs = index.search(xq, k=top_k)
    hits = []
    for i, score in zip(idxs[0], scores[0]):
        if i == -1 or score < min_score:
            continue
        hit = metas[i].copy()
        hit["score"] = score
        hits.append(hit)
    return hits


In [None]:
# ============================================================
# CELL 7) Decoder model (FLAN-T5) + RAG answering logic
# ============================================================
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
from collections import defaultdict

CPU_MODEL = "google/flan-t5-base"
tokenizer = AutoTokenizer.from_pretrained(CPU_MODEL)
model = AutoModelForSeq2SeqLM.from_pretrained(CPU_MODEL)
model.eval()

def _gen(prompt: str, max_new_tokens: int = 160) -> str:
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        out = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            num_beams=2,
            do_sample=False,
            no_repeat_ngram_size=4,
            repetition_penalty=1.15,
        )
    return tokenizer.decode(out[0], skip_special_tokens=True).strip()

_BAD_ECHO_PATTERNS = [
    "réponds uniquement", "uniquement à partir", "faits:", "question:", "réponse:",
    "si la réponse n’est pas", "si la reponse n'est pas",
    "tu es un assistant", "ne copie pas", "donne pas d’instructions", "donne pas d'instructions",
    "en réponse à votre question", "voici la réponse", "il est à noter que", "d'après les faits",
    "selon le document", "information fournie",
    "vous êtes un avocat francophone", "vous doivent répondre en français",
    "vous doivent répondre en français seulement",
    "répondez uniquement sur les faits ci-dessous", "écris en phrases complètes, sans puces ou tirts",
    "no copies", "no give instructions", "réponde en 1 to 3 phrases, factuelle",
    "le juge ord",
    "répondez uniquement sur les faits"
]

def _to_int(x, default=9999) -> int:
    try:
        return int(x)
    except Exception:
        return default

def _normalize_ocr(s: str) -> str:
    s = (s or "").lower()
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def _detect_intent(question: str) -> str:
    q = _normalize_ocr(question)

    if any(k in q for k in ["établissement public", "etablissement public", "il est créé", "il est cree",
                             "création", "creation", "institué", "institue", "créé", "cree", "créee", "creee"]):
        return "creation"
    if "objet" in q:
        return "objet"
    if any(k in q for k in ["définition", "definition", "définit", "definit", "entend par", "signifie", "désigne", "designe"]):
        return "definitions"
    if any(k in q for k in ["sanction", "infraction", "amende", "peine", "poursuite", "saisie"]):
        return "sanctions"
    if any(k in q for k in ["pouvoir", "compétence", "competence", "autorité", "autorite", "habilité", "habilite"]):
        return "powers"
    if any(k in q for k in ["domaine privé", "domaine prive", "biens", "immeubles", "terrains", "constructions", "locaux"]):
        return "biens"
    if any(k in q for k in ["procédure", "procedure", "modalités", "modalites", "conditions", "délai", "delai", "notification"]):
        return "procedure"

    return "general"

_INTENT_KEYWORDS = {
    "creation": [
        "il est créé", "il est cree", "est créé", "est cree",
        "il est institué", "il est institue", "est institué", "est institue",
        "établissement public", "etablissement public"
    ],
    "objet": ["a pour objet", "vise à", "vise a", "a pour but", "a pour finalité", "a pour finalite", "objet de la présente", "objet de la presente"],
    "definitions": ["est défini", "est defini", "désigne", "designe", "entend par", "signifie", "on entend par"],
    "sanctions": ["sanction", "amende", "peine", "poursuites", "infraction", "saisie"],
    "powers": ["pouvoir", "compétence", "competence", "autorité", "autorite", "habilité", "habilite", "chargé de", "charge de", "est chargé", "est charge"],
    "biens": ["domaine privé", "domaine prive", "terrains", "constructions", "immeubles", "locaux"],
    "procedure": ["procédure", "procedure", "modalités", "modalites", "conditions", "délai", "delai", "notification"],
    "general": []
}

_CREATION_REGEX = re.compile(
    r"(établissement\s+public|etablissement\s+public|"
    r"il\s+est\s+cr[eé]é|est\s+cr[eé]é|"
    r"il\s+est\s+institu[eé]|est\s+institu[eé])",
    re.IGNORECASE
)

def _is_probably_french(text: str) -> bool:
    t = f" { (text or '').lower() } "
    french_markers = [" le ", " la ", " les ", " des ", " une ", " un ", " et ", " est ", " sont ", " dans ", " pour ", " avec "]

    has_accents = any(ch in t for ch in "àâçéèêëîïôùûüœ")
    french_marker_count = sum(m in t for m in french_markers)

    if not has_accents and french_marker_count < 3:
        return False

    total_chars = len(text)
    if total_chars == 0:
        return False

    alpha_chars = sum(c.isalpha() for c in text)
    alpha_ratio = alpha_chars / total_chars
    if alpha_ratio < 0.70:
        return False

    return True

def _pick_main_source_by_score(norm_hits):
    score_sum = defaultdict(float)
    count = defaultdict(int)
    for h in norm_hits:
        score_sum[h["source"]] += float(h.get("score", 0.0))
        count[h["source"]] += 1
    if max(score_sum.values()) <= 1e-9:
        return max(count.items(), key=lambda x: x[1])[0]
    return max(score_sum.items(), key=lambda x: x[1])[0]

def answer_french(question: str, top_k: int = 6, min_score: float = 0.30) -> str:
    question = (question or "").strip()
    if not question:
        return "Je ne sais pas d’après ce document."

    intent = _detect_intent(question)
    keywords = _INTENT_KEYWORDS.get(intent, [])

    try:
        hits = retrieve(question, top_k=top_k, min_score=min_score)
    except TypeError:
        hits = retrieve(question, top_k=top_k)

    if not hits:
        return "Je ne sais pas d’après ce document."

    norm = []
    for h in hits:
        src = h.get("source")
        page = _to_int(h.get("page"), 9999)
        txt = _clean_text(h.get("text", ""))
        score = float(h.get("score", 0.0)) if h.get("score") is not None else 0.0
        if not src or not txt:
            continue
        norm.append({"source": src, "page": page, "text": txt, "score": score})

    if not norm:
        return "Je ne sais pas d’après ce document."

    main_source = _pick_main_source_by_score(norm)
    norm = [h for h in norm if h["source"] == main_source]
    if not norm:
        return "Je ne sais pas d’après ce document."

    if intent in ("objet", "definitions"):
        early = [h for h in norm if h["page"] <= 5]
        norm = early or norm
    elif intent == "sanctions":
        late = [h for h in norm if h["page"] >= 6]
        norm = late or norm

    if keywords:
        kw = [h for h in norm if any(k in _normalize_ocr(h["text"]) for k in keywords)]
        if kw:
            norm = kw

    norm_sorted = sorted(norm, key=lambda x: x.get("score", 0.0), reverse=True)

    facts = []
    pages_used = []
    seen_pages = set()

    for h in norm_sorted:
        p = h["page"]
        if p in seen_pages:
            continue
        seen_pages.add(p)
        facts.append(h["text"][:420])
        pages_used.append(p)
        if len(facts) >= 5:
            break

    if not facts:
        return "Je ne sais pas d’après ce document."

    if intent == "creation":
        joined = _normalize_ocr(" ".join(facts))
        if not _CREATION_REGEX.search(joined):
            return "Je ne sais pas d’après ce document."

    facts_block = "\n".join([f"- {f}" for f in facts])

    prompt = (
        "Tu es un assistant juridique francophone.\n"
        "Tu dois répondre EN FRANÇAIS uniquement.\n"
        "Réponds uniquement à partir des FAITS ci-dessous.\n"
        "Écris en phrases complètes, sans puces ni tirets.\n"
        "Ne copie pas les FAITS. Ne donne pas d’instructions.\n"
        "Réponds en 1 à 3 phrases, de manière factuelle.\n"
        "Si la réponse n’est pas clairement indiquée dans les FAITS, réponds exactement : "
        "\"Je ne sais pas d’après ce document.\".\n\n"
        f"FAITS:\n{facts_block}\n\n"
        f"QUESTION:\n{question}\n\n"
        "RÉPONSE:"
    )

    ans = _clean_text(_gen(prompt, max_new_tokens=160))
    ans_low = ans.lower()

    if intent == "creation" and not _CREATION_REGEX.search(ans):
        return "Je ne sais pas d’après ce document."

    bad_echo_count = sum(pat in ans_low for pat in _BAD_ECHO_PATTERNS)
    if (
        not ans
        or ans_low.startswith("je ne sais pas")
        or len(ans.split()) < 10
        or bad_echo_count > 1
        or not _is_probably_french(ans)
    ):
        return "Je ne sais pas d’après ce document."

    pages_used = sorted(set(pages_used))
    pages_line = "Pages utilisées : " + ", ".join([f"p.{p}" for p in pages_used])
    src_line = f"- {main_source} " + ", ".join([f"p.{p}" for p in pages_used])

    return f"{ans}\n\n{pages_line}\n\nSources:\n{src_line}"

print("answer_french is ready.")


tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

answer_french is ready.


This question is not related to pdfs so it is natural to say" Je ne sais pas d’après ce document".

In [None]:
# ============================================================
# CELL 8) Test queries
# ============================================================
question = "Quelles sont les règles de circulation dans cet arrêté ?"
print(f"Rerunning RAG query: '{question}'")
result = answer_french(question, top_k=6)
print("\n--- Answer ---\n")
print(result)


Rerunning RAG query: 'Quelles sont les règles de circulation dans cet arrêté ?'

--- Answer ---

Je ne sais pas d’après ce document.


This question is related to one of pdfs

In [None]:
question = "Quelles mesures administratives concernent la protection de l’environnement ?"
print(f"Rerunning RAG query: '{question}'")
result = answer_french(question, top_k=6)
print("\n--- Answer ---\n")
print(result)


Rerunning RAG query: 'Quelles mesures administratives concernent la protection de l’environnement ?'

--- Answer ---

locals constituent des institutions essentielles dapplication des mesures de protection de lenvironnement. Des textes législatifs ou réglementaires déterminent les modalités de leur participation. - Article 1er: La présente loi a pour objet la mise en oeuvre dun politique nationale de protection de la lenviron tendant à: -la protection, la restructuration et la valorisation des ressources naturelles, -la prévention et lutte contre tout form de pollution et nuisance, -lamélioration du cadre et de la qualité de vie. Chapitre I Principes généraux Article 2: La planification nationale prend

Pages utilisées : p.3, p.4, p.14

Sources:
- alg4047.pdf p.3, p.4, p.14
