https://huggingface.co/blog/not-lain/rag-chatbot-using-llama3

In [None]:
!pip install datasets sentence-transformers faiss-cpu accelerate

In [62]:
from sentence_transformers import SentenceTransformer
from datasets import load_dataset

ST = SentenceTransformer("all-MiniLM-L6-v2", cache_folder="models")
ds = load_dataset("JLK-ptbk/faq", cache_dir="datasets", split="train")

In [None]:
import ast
import re
from typing import List
from bs4 import BeautifulSoup


def parse_data(data_str: str) -> List[str]:
    """
    Parses a string representation of a list into an actual list.
    """
    try:
        # Safely evaluate the string to a Python list
        data_list = ast.literal_eval(data_str)
        if isinstance(data_list, list):
            return data_list
        else:
            return []
    except (SyntaxError, ValueError):
        # If parsing fails, attempt to extract strings using regex
        return re.findall(r"'(.*?)'", data_str)


def remove_html(text: str) -> str:
    """
    Removes HTML tags from a string.
    """
    soup = BeautifulSoup(text, "html.parser")
    return soup.get_text(separator=" ", strip=True)


def clean_entry(data_str: str) -> List[str]:
    """
    Cleans a single 'data' field entry.
    """
    parsed_list = parse_data(data_str)
    cleaned_list = []
    for item in parsed_list:
        # Remove HTML tags
        clean_text = remove_html(item)
        # Normalize whitespace
        clean_text = " ".join(clean_text.split())
        # Filter out entries that are too short or incomplete
        if len(clean_text) > 10 and not re.search(
            r"\bStartin\b", clean_text, re.IGNORECASE
        ):
            cleaned_list.append(clean_text)
    return cleaned_list


# Apply the cleaning function
def apply_cleaning(example) -> dict:
    cleaned = clean_entry(example["data"])
    cleaned = "Q: " + " A: ".join(cleaned)
    return {"text": cleaned}


cleaned_ds = ds.map(
    apply_cleaning, remove_columns=["Unnamed: 0", "index", "data"]
)

print(cleaned_ds[1])

In [None]:
# Add embeddings to the dataset
ds_with_embeddings = cleaned_ds.map(
    lambda example: {"embeddings": ST.encode(example["text"])},
)

# Add the index to the data
ds_with_embeddings.add_faiss_index(column="embeddings")

In [None]:
def search(query: str, k: int = 3):
    """a function that embeds a new query and returns the most probable results"""
    embedded_query = ST.encode(query)  # embed new query
    scores, retrieved_examples = (
        ds_with_embeddings.get_nearest_examples(  # retrieve results
            "embeddings",
            embedded_query,  # compare our new embedded query with the dataset embeddings
            k=k,  # get only top k results
        )
    )
    return scores, retrieved_examples
search("rebook")[1]['text']

In [88]:
SYS_PROMPT = """You are an assistant for answering questions.
You are given the extracted parts of a long document and a question. Provide the answer to the question based on the document.
If you don't know the answer, just say "I do not know." Don't make up an answer."""

In [89]:
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained(
    "HuggingFaceTB/SmolLM-135M-Instruct", cache_dir="models"
)
model = AutoModelForCausalLM.from_pretrained(
    "HuggingFaceTB/SmolLM-135M-Instruct", cache_dir="models"
).to("cuda")
terminators = [tokenizer.eos_token_id, tokenizer.convert_tokens_to_ids("<|eot_id|>")]

In [90]:
def format_prompt(prompt, retrieved_documents, k):
    """using the retrieved documents we will prompt the model to generate our responses"""
    PROMPT = f"Q:{prompt}\nContext:"
    for idx in range(k):
        PROMPT += f"{retrieved_documents['text'][idx]}\n"
    return PROMPT


def generate(formatted_prompt):
    formatted_prompt = formatted_prompt[:2000]  # to avoid GPU OOM
    messages = [
        {"role": "system", "content": SYS_PROMPT},
        {"role": "user", "content": formatted_prompt},
    ]
    input_text = tokenizer.apply_chat_template(messages, tokenize=False)
    inputs = tokenizer.encode(input_text, return_tensors="pt").to("cuda")
    # More tokens = longer responses
    outputs = model.generate(
        inputs, max_new_tokens=200, temperature=0.5, top_p=0.9, do_sample=True, eos_token_id=terminators
    )
    response = outputs[0][inputs.shape[-1] :]
    return tokenizer.decode(response, skip_special_tokens=True)

def rag_chatbot(prompt: str, k: int = 2):
    scores, retrieved_documents = search(prompt, k)
    formatted_prompt = format_prompt(prompt, retrieved_documents, k)
    return generate(formatted_prompt)

In [None]:
# Test the RAG pipeline
query = "What methods of payment does PetBacker accept?"

response = rag_chatbot(query, k=2)
print("Response:", response)

In [None]:
## Step 4: Evaluate the RAG Model
from rouge_score import rouge_scorer

# Reference response for evaluation
reference = """
The methods of payment vary by region. Some of the payment methods PetBacker accepts are direct bank in, and major credit cards, debit cards via Paypal.
"""

# Evaluation with ROUGE
scorer = rouge_scorer.RougeScorer(["rouge1", "rougeL"], use_stemmer=True)
scores = scorer.score(response, reference)
print("ROUGE scores:", scores)