In [8]:
import argparse
import math
import os
import re
from pathlib import Path
import numpy as np
import faiss
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
from huggingface_hub import login
import csv

In [23]:
# =========================
# 5) Charger LLaMA-3 8B sur GPU (CUDA) — 4-bit pour tenir en 8GB
# =========================
# Objectif perf: réduire la latence d'inférence. On utilise `model.generate()` (plus léger que pipeline).

import time
import gc

os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
try:
    # New API (PyTorch 2.9+)
    torch.backends.cuda.matmul.fp32_precision = "tf32"
    torch.backends.cudnn.conv.fp32_precision = "tf32"
except Exception:
    # Backward-compatible API
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
try:
    torch.set_float32_matmul_precision("high")
except Exception:
    pass

if not torch.cuda.is_available():
    raise RuntimeError(
        "CUDA non détecté. Installe un PyTorch CUDA et/ou vérifie le driver GPU."
    )

# Auth Hugging Face (évite les tokens en dur dans le notebook)
HF_TOKEN = os.environ.get("HF_TOKEN")
if HF_TOKEN:
    login(token=HF_TOKEN)
else:
    print("HF_TOKEN non défini. Si le modèle est 'gated', définis l'env var HF_TOKEN.")

MODEL_NAME = "meta-llama/Meta-Llama-3-8B-Instruct"
print("Model:", MODEL_NAME)
print("CUDA:", torch.cuda.get_device_name(0))

# Libère proprement l'ancien modèle/pipeline pour éviter OOM
for _name in ("gen", "model", "tokenizer"):
    if _name in globals():
        try:
            del globals()[_name]
        except Exception:
            pass
gc.collect()
torch.cuda.empty_cache()

from transformers import BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
 )

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map={"": 0},  # tout sur GPU:0
    quantization_config=quantization_config,
    low_cpu_mem_usage=True,
    token=HF_TOKEN,
)
model.eval()


def format_prompt(question: str, contexts: list[str]) -> str:
    context_block = "\n\n---\n\n".join(contexts)
    system = "You are an assistant who only responds based on the provided CONTEXT."

    if hasattr(tokenizer, "apply_chat_template"):
        messages = [
            {"role": "system", "content": system},
            {
                "role": "user",
                "content": f"CONTEXTE:\n{context_block}\n\nQUESTION:\n{question}",
            },
        ]
        return tokenizer.apply_chat_template(
            messages, tokenize=False, add_generation_prompt=True
)

    return (
        f"SYSTEM:\n{system}\n\n"
        f"CONTEXTE:\n{context_block}\n\n"
        f"QUESTION:\n{question}\n\n"
        f"RÉPONSE:\n"
)


def _generate(prompt: str, max_new_tokens: int) -> str:
    inputs = tokenizer(prompt, return_tensors="pt")
    inputs = {k: v.to("cuda") for k, v in inputs.items()}
    with torch.inference_mode():
        out_ids = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            num_beams=1,
            use_cache=True,
            pad_token_id=tokenizer.eos_token_id,
        )
    gen_ids = out_ids[0, inputs["input_ids"].shape[1] :]
    return tokenizer.decode(gen_ids, skip_special_tokens=True).strip()


def ask_pdf(
    question: str,
    k: int = 2,
    max_new_tokens: int = 1024,
    max_context_chars: int = 650,
):
    """Fast path: small k + truncated contexts + short generation."""
    hits = retrieve(question, k=k)
    contexts = [
        f"[chunk {idx} | score={score:.3f}]\n{text[:max_context_chars]}"
        for idx, score, text in hits
]
    prompt = format_prompt(question, contexts)
    answer = _generate(prompt, max_new_tokens=max_new_tokens)
    return answer, hits

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Model: meta-llama/Meta-Llama-3-8B-Instruct
CUDA: NVIDIA GeForce RTX 4060 Laptop GPU


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [24]:
# =========================
# 1) Select PDF (local)
# =========================
def _pick_pdf_with_tkinter() -> str | None:
    """Return a PDF path chosen via a file dialog, or None if unavailable."""
    try:
        import tkinter as tk
        from tkinter import filedialog
    except Exception:
        return None

    root = None
    try:
        root = tk.Tk()
        root.withdraw()
        root.attributes("-topmost", True)
        path = filedialog.askopenfilename(
            title="Select a PDF",
            filetypes=[("PDF files", "*.pdf"), ("All files", "*")],
        )
        return path or None
    except Exception:
        return None
    finally:
        if root is not None:
            try:
                root.destroy()
            except Exception:
                pass


def get_pdf_path(pdf_path: str | None = None) -> str:
    """Resolve the PDF path from (1) parameter, (2) CLI args, (3) env, (4) auto COLREG, (5) GUI picker, (6) input()."""
    if pdf_path:
        pdf = Path(pdf_path).expanduser()
        if not pdf.is_file():
            raise FileNotFoundError(f"PDF not found: {pdf}")
        return str(pdf)

    parser = argparse.ArgumentParser(description="RAG over a local PDF", add_help=False)
    parser.add_argument("--pdf", dest="pdf_path", help="Path to the PDF to ingest")
    args, _unknown = parser.parse_known_args()
    if args.pdf_path:
        pdf = Path(args.pdf_path).expanduser()
        if not pdf.is_file():
            raise FileNotFoundError(f"PDF not found: {pdf}")
        return str(pdf)

    env_pdf = os.environ.get("PDF_PATH")
    if env_pdf:
        pdf = Path(env_pdf).expanduser()
        if not pdf.is_file():
            raise FileNotFoundError(f"PDF not found (PDF_PATH): {pdf}")
        return str(pdf)

    # Auto-pick COLREG PDF if present (avoids slow dialogs)
    candidates = [
        Path("COLREG-Consolidated-2018.pdf"),
        Path("..") / "COLREG-Consolidated-2018.pdf",
        Path.cwd() / "COLREG-Consolidated-2018.pdf",
    ]
    for cand in candidates:
        try:
            if cand.is_file():
                return str(cand.resolve())
        except Exception:
            pass

    picked = _pick_pdf_with_tkinter()
    if picked:
        return picked

    entered = input("Enter path to PDF: ").strip().strip('"')
    pdf = Path(entered).expanduser()
    if not pdf.is_file():
        raise FileNotFoundError(f"PDF not found: {pdf}")
    return str(pdf)


pdf_path = get_pdf_path()
print("PDF:", pdf_path)

PDF: C:\Users\celli\Documents\.PIP2026\PIP-2026-LOTUSim_G5_Fatigue\COLREG-Consolidated-2018.pdf


In [25]:

# =========================
# 2) PDF -> texte
# =========================
def pdf_to_text(path: str) -> str:
    reader = PdfReader(path)
    parts = []
    for i, page in enumerate(reader.pages):
        txt = page.extract_text() or ""
        # nettoyage léger
        txt = txt.replace("\u00ad", "")  # soft hyphen
        txt = re.sub(r"[ \t]+", " ", txt)
        parts.append(txt.strip())
    return "\n\n".join([p for p in parts if p])


raw_text = pdf_to_text(pdf_path)
print("Chars:", len(raw_text))


# =========================
# 3) Chunking (découpage)
# =========================
def chunk_text(text: str, chunk_size=900, overlap=150):
    text = text.strip()
    chunks = []
    start = 0
    n = len(text)
    while start < n:
        end = min(start + chunk_size, n)
        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)
        start += chunk_size - overlap
    return chunks


chunks = chunk_text(raw_text, chunk_size=900, overlap=150)
print("Nb chunks:", len(chunks))
print("Ex chunk:\n", chunks[0][:400], "...")

Chars: 104192
Nb chunks: 139
Ex chunk:
 Convention on the International Regulations 
for Preventing Collisions at Sea, 1972 
Consolidated edition, 2018 
 
ARTICLE I 
General Obligations 
The Parties to the present Convention undertake to give effect to the Rules and other 
Annexes constituting the International Regulations for Preventing Collisions at Sea, 1972, 
 
ARTICLE II 
Signature, Ratification, Acceptance, Approval and Accession  ...


In [26]:

# =========================
# 4) Embeddings + FAISS index (GPU si dispo)
# =========================
EMB_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
embedder = SentenceTransformer(EMB_MODEL)

# encode en float32 (FAISS)
emb = embedder.encode(
    chunks, batch_size=64, show_progress_bar=True, normalize_embeddings=True
)
emb = np.array(emb, dtype="float32")

dim = emb.shape[1]

# Prefer FAISS GPU if available (keeps search fast and off CPU)
use_faiss_gpu = False
try:
    n_gpus = faiss.get_num_gpus()
    use_faiss_gpu = n_gpus > 0
except Exception:
    n_gpus = 0
    use_faiss_gpu = False

if use_faiss_gpu:
    print(f"FAISS GPUs detected: {n_gpus}. Using GPU index.")
    res = faiss.StandardGpuResources()
    index = faiss.GpuIndexFlatIP(res, dim)  # Inner Product (ok si normalize_embeddings=True)
else:
    print("FAISS GPU not available, using CPU index.")
    index = faiss.IndexFlatIP(dim)  # Inner Product (ok si normalize_embeddings=True)

index.add(emb)


def retrieve(query: str, k=3):
    q_emb = embedder.encode([query], normalize_embeddings=True).astype("float32")
    scores, ids = index.search(q_emb, k)
    results = []
    for score, idx in zip(scores[0], ids[0]):
        results.append((int(idx), float(score), chunks[int(idx)]))
    return results

Batches:   0%|          | 0/3 [00:00<?, ?it/s]

FAISS GPU not available, using CPU index.


In [None]:
# (Optionnel) Login HF ici si besoin (sinon géré dans la cellule modèle)
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
    login(token=hf_token)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [28]:
# =========================
# 6) Batch Q/A from CSV -> minimal CSV (id, question, answer)
# =========================
# Pré-requis:
# - Cellule 3: pdf_path pointe sur COLREG
# - Cellule 4: chunks/raw_text créés
# - Cellule 5: index + retrieve() OK
# - Cellule 2: modèle + ask_pdf() OK

import csv
import time
from pathlib import Path

def _resolve_csv_path(name: str) -> str:
    candidates = [
        Path(name),
        Path.cwd() / name,
        Path("..") / name,
        Path("../..") / name,
    ]
    for p in candidates:
        if p.is_file():
            return str(p)
    raise FileNotFoundError(f"CSV not found: {name} (tried: {[str(c) for c in candidates]})")


def _pick_row_id(row: dict, fallback: int) -> str:
    for key in ("id", "Id", "ID", "question_id", "QuestionID", "questionId"):
        if key in row and str(row[key]).strip():
            return str(row[key]).strip()
    return str(fallback)


csv_path = _resolve_csv_path("Set évaluation RAG.csv")
output_path = "Set évaluation RAG_QA.csv"

print("Input CSV:", csv_path)
print("Output CSV:", output_path)

# Warmup to stabilize first-run latency
_ = ask_pdf("Warmup question", k=1, max_new_tokens=8)

rows = []
with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
    reader = csv.DictReader(f)
    if not reader.fieldnames:
        raise ValueError("CSV has no header / fieldnames")
    for row in reader:
        rows.append(row)

if not rows:
    raise ValueError("CSV contains no rows")

with open(output_path, "w", encoding="utf-8", newline="") as f_out:
    writer = csv.DictWriter(f_out, fieldnames=["id", "question", "answer"])
    writer.writeheader()

    for i, row in enumerate(rows, start=1):
        q = (
            row.get("question")
            or row.get("Question")
            or row.get("QUESTION")
            or next(iter(row.values()))
        )
        q = (q or "").strip()
        qid = _pick_row_id(row, fallback=i)

        if not q:
            writer.writerow({"id": qid, "question": "", "answer": ""})
            continue

        t0 = time.perf_counter()
        try:
            answer, _sources = ask_pdf(q, k=5, max_new_tokens=512)
            elapsed = time.perf_counter() - t0
            print(f"[{i}/{len(rows)}] {elapsed:.2f}s | id={qid} | {q[:80]}")
            writer.writerow({"id": qid, "question": q, "answer": answer})
        except Exception as e:
            elapsed = time.perf_counter() - t0
            print(f"[{i}/{len(rows)}] {elapsed:.2f}s | id={qid} | ERROR | {q[:80]} -> {e}")
            writer.writerow({"id": qid, "question": q, "answer": f"Error: {e}"})

print("Done. Wrote:", output_path)

Input CSV: ..\Set évaluation RAG.csv
Output CSV: Set évaluation RAG_QA.csv
[1/12] 3.18s | id=1 | According to Rule 32, what are the precise definitions of a "whistle," a "short 
[2/12] 3.12s | id=2 | What are the minimum visibility ranges for the masthead light, sidelight, and st
[3/12] 4.51s | id=3 | Under Rule 10 regarding Traffic Separation Schemes, when is a vessel allowed to 
[4/12] 2.64s | id=4 | What precise sound signal must a vessel aground give if she is 100 meters or mor
[5/12] 10.49s | id=5 | Summarize the actions a vessel must take to determine if a "risk of collision" e
[6/12] 8.26s | id=6 | Describe the lighting configuration required for a vessel "not under command" (N
[7/12] 13.24s | id=7 | Explain the hierarchy of responsibilities between different vessel types (Power-
[8/12] 5.19s | id=8 | Based on the "IMO Recommendation on Navigational Watchkeeping," what are the key
[9/12] 5.97s | id=9 | A sailing vessel is overtaking a power-driven vessel. According to the intera