# Auteurs : BAUDET Quentin & LARMAILLARD-NOIREN Joris

## Prototype RAG – Recommandateur de Smartphones  
Ce notebook illustre pas à pas :  
1. Chargement des données et configuration  
2. Construction du jeu de passages + embeddings DPR  
3. Création de l’index FAISS  
4. Chargement du pipeline RAG Hugging-Face  
5. Tests d’interrogation  


### Chargement des données

In [None]:
### Importation des modules
import os
import re
import torch
import faiss
import numpy as np
import pandas as pd
from datasets import Dataset, load_from_disk
from transformers import (
    DPRQuestionEncoder, DPRQuestionEncoderTokenizerFast,
    AutoTokenizer, AutoModelForSeq2SeqLM
)
from sentence_transformers import SentenceTransformer

In [None]:
### Chargement des variables d'environnement
TRANSFORMER = os.getenv('TRANSFORMER')
MODEL_NAME = os.getenv("MODEL_NAME")
GEN_MODEL = os.getenv("GEN_MODEL")
PASSAGES_PATH = os.getenv("PASSAGES_PATH")
DPR_DATASET = os.getenv("DPR_DATASET")
INDEX_PATH = os.getenv("INDEX_PATH")
N_DOCS = int(os.getenv("N_DOCS", 150))
DEVICE = os.getenv("DEVICE")

In [None]:
### Chargement des données CSV
data = pd.read_csv("../data/processed/Smartphones_cleaned_dataset_processed.csv")

### 1. Création des passages

Ici, nous allons procéder à la création des passages. On lit le CSV nettoyé et on génère un fichier JSONL de `{"title","text"}`, où `title` est le nom de la marque, et le modèle du téléphone, et `text` correspond à toutes les caractéristiques d'un téléphone : c'est-à-dire le prix, la taille d'écran, etc.

In [None]:
records = []
for _, phone in data.iterrows():
    title = f"{phone['brand_name']} {phone['model']}"
    text = (
        f"Prix : {phone['price (€)']} €, Segment du prix : {phone['price_segment']}, "
        f"Taille d'écran : {phone['screen_size']}”, Taux de rafraîchissement : {phone['refresh_rate']} Hz, "
        f"Note : {phone['rating']}, Rapport qualité-prix : {phone['quality-price_ratio']}, "
        f"5G : {'Disponible' if phone['has_5g'] else 'Non disponible'}, "
        f"Stockage : {phone['internal_memory']} Go, RAM : {phone['ram_capacity']} Go, "
        f"CPU : {phone['processor_brand']} {round(phone['num_cores'] * phone['processor_speed'], 2)} GHz, "
        f"Batterie : {phone['battery_capacity']} mAh, Autonomie : {phone['quality_battery_autonomy']}, "
        f"Charge rapide : {'Non disponible' if phone['fast_charging_available'] == 0 else 'Disponible'}, "
        f"App. AR : {phone['num_rear_cameras']} ×, App. AV : {phone['num_front_cameras']} ×, "
        f"Caméra prin. AR : {phone['primary_camera_rear']} Mpx, Caméra prin. AV : {phone['primary_camera_front']} Mpx, "
        f"OS : {phone['os']}, "
        f"Niveau de performance : {phone['performance category']}"
    )
    records.append({"title": title, "text": text})

### Enregistrement des passages dans un dataset
ds = Dataset.from_list(records)

### Transformation du contenu du dataset (Les passages) en fichier JSONL
ds.to_json(PASSAGES_PATH, orient="records", lines=True)

print("Passages enregistrés dans", PASSAGES_PATH)

### 2. Création du dataset DPR et embeddings

Ici, il s'agira d'encoder chaque `text` avec Sentence-Transformer et d'ajouter la colonne `embeddings`.

In [None]:
### Chargement des passages dans un dataset
ds = Dataset.from_json(PASSAGES_PATH, split="train")

"""
Création d'un embedder pour le calcul des embeddings
DEVICE définit quel matériel (Hardware ici) sur lequel nous faisons les calculs des embeddings : CPU ou GPU
Transformer : all-MiniLM-L6-v2
"""
embedder = SentenceTransformer(TRANSFORMER, device=DEVICE)

### Calcul des embeddings
embs = embedder.encode(ds["text"], convert_to_numpy=True, show_progress_bar=True)

### Ajout de la colonne `embeddings`
ds = ds.add_column("embeddings", embs.tolist())

### Sauvegarde du dataset
ds.save_to_disk(DPR_DATASET)
print("Dataset DPR sauvé dans", DPR_DATASET)

**Affichage embeddings**

In [None]:
### Chargement des embeddings
embeddings = np.array(ds["embeddings"], dtype=np.float32)

### On vérifie ici la dimension des embeddings
print("Dimension des embeddings:", embeddings.shape)

### Affichage des 5 premiers embeddings
print("5 premiers embeddings :", embeddings[:5])

### 3. Création de l’index FAISS

Ici, nous chargeons le dataset DPR, puis extraction des embeddings et on bâtit un `IndexFlatIP`.

In [None]:
### Chargement des embeddings
ds2 = load_from_disk(DPR_DATASET)

emb_np = np.array(ds2["embeddings"], dtype=np.float32)

### Création de l'index
index = faiss.IndexFlatIP(emb_np.shape[1])

index.add(emb_np)

faiss.write_index(index, INDEX_PATH)

print("Index FAISS enregistré dans", INDEX_PATH)

### 4. Fonction de retrieval DPR  

Ici, nous allons encoder la question poser par l'utilisateur, et effectuer la recherche dans FAISS.

In [None]:
### Chargement DPR
dpr_tok = DPRQuestionEncoderTokenizerFast.from_pretrained("facebook/dpr-question_encoder-single-nq-base")

dpr_enc = DPRQuestionEncoder.from_pretrained("facebook/dpr-question_encoder-single-nq-base").to(DEVICE)

faiss_idx = faiss.read_index(INDEX_PATH)
passages = ds2["text"]

### Fonction retriever
def retrieve(q: str, top_k: int = N_DOCS):
    ### Encodage de la question - Utilisation du DEVICE
    entrees = dpr_tok(q, padding=True, truncation=True, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        rep = dpr_enc(**entrees)[0].cpu().numpy()

    ### Recherche FAISS
    scores, ids = faiss_idx.search(rep.astype(np.float32), k=top_k)
    
    ### Conversion explicite des indices en entier
    docs = [(passages[int(i)], float(scores[0, idx])) for idx, i in enumerate(ids[0])]

    return docs

### 5. Création du modèle avec Google flan-T5

Fonction utilitaire pour parser les différentes contraintes de l'utilisateur et charger les éléments directement depuis le dataset

In [None]:
def detect_criterion(input_q: str):
    question_p = input_q.lower()
    if re.search(r"autonomie|batterie|endurance", question_p):
        return "battery_capacity", True
    if re.search(r"photo|nuit|caméra", question_p):
        return "primary_camera_rear", True
    if re.search(r"performance|cpu|processeur", question_p):
        return "processor_speed", True
    if re.search(r"mémoire vive|ram|stockage", question_p):
        return "ram_capacity", True

    return None, True

**Partie génération Seq2Seq**

In [None]:
### Mise en place du modèle - Google flan-T5
gen_tok = AutoTokenizer.from_pretrained(GEN_MODEL)
gen_mod = AutoModelForSeq2SeqLM.from_pretrained(
    GEN_MODEL,
    torch_dtype=torch.float16,
    device_map="auto"
).to(DEVICE)

In [None]:

def ask(question: str) -> str:
    ### Extraction du contexte
    docs = retrieve(question)
    ### Concentration sur le `text`
    passages = [txt for txt,_ in docs]
    context = "\n\n".join(f"[Doc {i+1}] {txt}" for i,(txt,_) in enumerate(docs))
    
    ### Ajout d'exemples pour que le modèle puisse répondre par des phrases structurées et argumentées aux questions posées par l'utilisateur
    few_shot = """
Répondre de manière concise, claire et argumentée par rapport au contexte et à la question.

Exemple de réponse attendue :

Question : Je veux un smartphone qui tient bien, avec une bonne autonomie et un segment de prix milieu. Quel smartphone me proposerais-tu ?

Exemple de réponse attendue :
La question porte sur l'autonomie des téléphones. Voici une comparaison basée sur l'autonomie de la batterie de chaque téléphone :
- Le modèle [Doc 3] Motorola Moto G40 Fusion se distingue par sa batterie de 5000 mAh, qui lui confère une **autonomie haute**, bien supérieure à celle des autres modèles (3110 mAh et 3240 mAh). Ce modèle est une **meilleure option pour ceux qui cherchent un téléphone avec une bonne autonomie**, en plus de son **rapport qualité-prix intéressant** pour un téléphone dans le segment bas.
"""

    prompt = few_shot + "\n" + \
             f"Contexte :\n{context}\n\n" + \
             f"Question : {question}\nRéponse :"

    inputs = gen_tok(prompt, return_tensors="pt", truncation=True).to(DEVICE)
    
    inputs.pop("token_type_ids", None)
    out = gen_mod.generate(
        **inputs,
        max_new_tokens=650,
        num_beams=8,
        no_repeat_ngram_size=4,
        length_penalty=1.2,
        early_stopping=True
    )
    return gen_tok.decode(out[0], skip_special_tokens=True)


In [None]:
q = ("Je veux un smartphone qui tient bien, avec une bonne autonomie avec un segment de prix milieu.")
print("Q:", q)
print("A:", ask(q), "\n")

**Vérification du fonctionnement de FAISS**


In [None]:
### On teste la récupération de documents pour une question exemple
question = "Je veux un smartphone avec une excellent autonomie."

### Encodage de la question et calcul des embeddings
inputs = dpr_tok(question, padding=True, truncation=True, return_tensors="pt").to(DEVICE)
with torch.no_grad():
    question_embedding = dpr_enc(**inputs)[0].cpu().numpy()

### Recherche des passages les plus proches dans FAISS
scores, ids = faiss_idx.search(question_embedding.astype(np.float32), k=10)

### Conversion des indices en entier classique
for i, identifiant in enumerate(ids[0]):
    doc_id = int(identifiant)
    print(f"Doc {i+1}: {passages[doc_id]} (Score: {scores[0][i]})")

In [None]:
### Vérification des colonnes disponibles
print(ds.column_names)